You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:01 UTC

[1/6] ignite git commit: Moved exchange worker to separate class.

Repository: ignite
Updated Branches:
  refs/heads/ignite-4565-ddl 70a11912b -> 4538526d8


Moved exchange worker to separate class.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23f67a5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23f67a5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23f67a5a

Branch: refs/heads/ignite-4565-ddl
Commit: 23f67a5a1b8e6c7429d110df858c8bdd17a913d7
Parents: 9020d12
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 15:44:57 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 15:44:57 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorker.java     | 355 +++++++++++++++++++
 .../GridCachePartitionExchangeManager.java      | 329 +----------------
 2 files changed, 368 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
new file mode 100644
index 0000000..98a9cc0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+
+/**
+ * Exchange future thread. All exchanges happen only by one thread and next
+ * exchange will not start until previous one completes.
+ */
+public class CachePartitionExchangeWorker<K, V> extends GridWorker {
+    /** Cache context. */
+    private final GridCacheSharedContext<K, V> cctx;
+
+    /** Exchange manager. */
+    private final GridCachePartitionExchangeManager<K, V> exchMgr;
+
+    /** Future queue. */
+    private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+        new LinkedBlockingDeque<>();
+
+    /** Busy flag used as performance optimization to stop current preloading. */
+    private volatile boolean busy;
+
+    /**
+     * Constructor.
+     *
+     * @param exchMgr Exchange manager.
+     * @param log Logger.
+     */
+    public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) {
+        super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log);
+
+        this.cctx = exchMgr.context();
+
+        this.exchMgr = exchMgr;
+    }
+
+    /**
+     * Add first exchange future.
+     *
+     * @param fut Future.
+     */
+    public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
+        futQ.addFirst(fut);
+    }
+
+    /**
+     * @param exchFut Exchange future.
+     */
+    void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        assert exchFut != null;
+
+        if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
+            futQ.offer(exchFut);
+
+        if (log.isDebugEnabled())
+            log.debug("Added exchange future to exchange worker: " + exchFut);
+    }
+
+    /**
+     * Dump debug info.
+     */
+    public void dumpFuturesDebugInfo() {
+        U.warn(log, "Pending exchange futures:");
+
+        for (GridDhtPartitionsExchangeFuture fut : futQ)
+            U.warn(log, ">>> " + fut);
+    }
+
+    /**
+     * @return {@code True} iif exchange queue is empty.
+     */
+    public boolean exchangeQueueIsEmpty() {
+        return futQ.isEmpty();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+        long timeout = cctx.gridConfig().getNetworkTimeout();
+
+        int cnt = 0;
+
+        while (!isCancelled()) {
+            GridDhtPartitionsExchangeFuture exchFut = null;
+
+            cnt++;
+
+            try {
+                boolean preloadFinished = true;
+
+                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                    preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
+
+                    if (!preloadFinished)
+                        break;
+                }
+
+                // If not first preloading and no more topology events present.
+                if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished)
+                    timeout = cctx.gridConfig().getNetworkTimeout();
+
+                // After workers line up and before preloading starts we initialize all futures.
+                if (log.isDebugEnabled()) {
+                    Collection<IgniteInternalFuture> unfinished = new HashSet<>();
+
+                    for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) {
+                        if (!fut.isDone())
+                            unfinished.add(fut);
+                    }
+
+                    log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
+                }
+
+                // Take next exchange future.
+                if (isCancelled())
+                    Thread.currentThread().interrupt();
+
+                exchFut = futQ.poll(timeout, MILLISECONDS);
+
+                if (exchFut == null)
+                    continue; // Main while loop.
+
+                busy = true;
+
+                Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
+
+                boolean dummyReassign = exchFut.dummyReassign();
+                boolean forcePreload = exchFut.forcePreload();
+
+                try {
+                    if (isCancelled())
+                        break;
+
+                    if (!exchFut.dummy() && !exchFut.forcePreload()) {
+                        exchMgr.lastTopologyFuture(exchFut);
+
+                        exchFut.init();
+
+                        int dumpedObjects = 0;
+
+                        while (true) {
+                            try {
+                                exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                                break;
+                            }
+                            catch (IgniteFutureTimeoutCheckedException ignored) {
+                                U.warn(log, "Failed to wait for partition map exchange [" +
+                                    "topVer=" + exchFut.topologyVersion() +
+                                    ", node=" + cctx.localNodeId() + "]. " +
+                                    "Dumping pending objects that might be the cause: ");
+
+                                if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+                                    try {
+                                        exchMgr.dumpDebugInfo(exchFut.topologyVersion());
+                                    }
+                                    catch (Exception e) {
+                                        U.error(log, "Failed to dump debug information: " + e, e);
+                                    }
+
+                                    if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+                                        U.dumpThreads(log);
+
+                                    dumpedObjects++;
+                                }
+                            }
+                        }
+
+
+                        if (log.isDebugEnabled())
+                            log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+                                this + ']');
+
+                        boolean changed = false;
+
+                        // Just pick first worker to do this, so we don't
+                        // invoke topology callback more than once for the
+                        // same event.
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                            if (cacheCtx.isLocal())
+                                continue;
+
+                            changed |= cacheCtx.topology().afterExchange(exchFut);
+                        }
+
+                        if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty())
+                            exchMgr.refreshPartitions();
+                    }
+                    else {
+                        if (log.isDebugEnabled())
+                            log.debug("Got dummy exchange (will reassign)");
+
+                        if (!dummyReassign) {
+                            timeout = 0; // Force refresh.
+
+                            continue;
+                        }
+                    }
+
+                    if (!exchFut.skipPreload()) {
+                        assignsMap = new HashMap<>();
+
+                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                            long delay = cacheCtx.config().getRebalanceDelay();
+
+                            GridDhtPreloaderAssignments assigns = null;
+
+                            // Don't delay for dummy reassigns to avoid infinite recursion.
+                            if (delay == 0 || forcePreload)
+                                assigns = cacheCtx.preloader().assign(exchFut);
+
+                            assignsMap.put(cacheCtx.cacheId(), assigns);
+                        }
+                    }
+                }
+                finally {
+                    // Must flip busy flag before assignments are given to demand workers.
+                    busy = false;
+                }
+
+                if (assignsMap != null) {
+                    int size = assignsMap.size();
+
+                    NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
+                    for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
+                        int cacheId = e.getKey();
+
+                        GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                        int order = cacheCtx.config().getRebalanceOrder();
+
+                        if (orderMap.get(order) == null)
+                            orderMap.put(order, new ArrayList<Integer>(size));
+
+                        orderMap.get(order).add(cacheId);
+                    }
+
+                    Runnable r = null;
+
+                    List<String> rebList = new LinkedList<>();
+
+                    boolean assignsCancelled = false;
+
+                    for (Integer order : orderMap.descendingKeySet()) {
+                        for (Integer cacheId : orderMap.get(order)) {
+                            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                            GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+
+                            if (assigns != null)
+                                assignsCancelled |= assigns.cancelled();
+
+                            // Cancels previous rebalance future (in case it's not done yet).
+                            // Sends previous rebalance stopped event (if necessary).
+                            // Creates new rebalance future.
+                            // Sends current rebalance started event (if necessary).
+                            // Finishes cache sync future (on empty assignments).
+                            Runnable cur = cacheCtx.preloader().addAssignments(assigns,
+                                forcePreload,
+                                cnt,
+                                r,
+                                exchFut.forcedRebalanceFuture());
+
+                            if (cur != null) {
+                                rebList.add(U.maskName(cacheCtx.name()));
+
+                                r = cur;
+                            }
+                        }
+                    }
+
+                    if (assignsCancelled) { // Pending exchange.
+                        U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                            "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                            ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                    else if (r != null) {
+                        Collections.reverse(rebList);
+
+                        U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+
+                        if (exchangeQueueIsEmpty()) {
+                            U.log(log, "Rebalancing started " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+
+                            r.run(); // Starts rebalancing routine.
+                        }
+                        else
+                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                    else
+                        U.log(log, "Skipping rebalancing (nothing scheduled) " +
+                            "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                            ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                }
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                throw e;
+            }
+            catch (IgniteClientDisconnectedCheckedException ignored) {
+                return;
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, "Failed to wait for completion of partition map exchange " +
+                    "(preloading will not start): " + exchFut, e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f9222bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,24 +21,15 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,7 +64,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -90,7 +80,6 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
@@ -98,7 +87,6 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -126,12 +114,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
-    /** Last partition refresh. */
-    private final AtomicLong lastRefresh = new AtomicLong(-1);
-
     /** */
     @GridToStringInclude
-    private ExchangeWorker exchWorker;
+    private CachePartitionExchangeWorker exchWorker;
 
     /** */
     @GridToStringExclude
@@ -297,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        exchWorker = new ExchangeWorker();
+        exchWorker = new CachePartitionExchangeWorker<>(this, log);
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
@@ -369,7 +354,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.futQ.addFirst(fut);
+        exchWorker.addFirstFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -597,6 +582,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * @param lastInitializedFut Last completed topology future.
+     */
+    public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) {
+        this.lastInitializedFut = lastInitializedFut;
+    }
+
+    /**
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
@@ -684,7 +676,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.futQ.isEmpty();
+        return !exchWorker.exchangeQueueIsEmpty();
     }
 
     /**
@@ -739,7 +731,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * Partition refresh callback.
      */
-    private void refreshPartitions() {
+    public void refreshPartitions() {
         ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -1345,10 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
-            U.warn(log, ">>> " + fut);
+        exchWorker.dumpFuturesDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1536,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param deque Deque to poll from.
-     * @param time Time to wait.
-     * @param w Worker.
-     * @return Polled item.
-     * @throws InterruptedException If interrupted.
-     */
-    @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
-        assert w != null;
-
-        // There is currently a case where {@code interrupted}
-        // flag on a thread gets flipped during stop which causes the pool to hang.  This check
-        // will always make sure that interrupted flag gets reset before going into wait conditions.
-        // The true fix should actually make sure that interrupted flag does not get reset or that
-        // interrupted exception gets propagated. Until we find a real fix, this method should
-        // always work to make sure that there is no hanging during stop.
-        if (w.isCancelled())
-            Thread.currentThread().interrupt();
-
-        return deque.poll(time, MILLISECONDS);
-    }
-
-    /**
      * @param node Target node.
      * @return {@code True} if can use compression for partition map messages.
      */
@@ -1587,276 +1554,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * Exchange future thread. All exchanges happen only by one thread and next
-     * exchange will not start until previous one completes.
-     */
-    private class ExchangeWorker extends GridWorker {
-        /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
-            new LinkedBlockingDeque<>();
-
-        /** Busy flag used as performance optimization to stop current preloading. */
-        private volatile boolean busy;
-
-        /**
-         *
-         */
-        private ExchangeWorker() {
-            super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
-        }
-
-        /**
-         * @param exchFut Exchange future.
-         */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
-            assert exchFut != null;
-
-            if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
-                futQ.offer(exchFut);
-
-            if (log.isDebugEnabled())
-                log.debug("Added exchange future to exchange worker: " + exchFut);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-            long timeout = cctx.gridConfig().getNetworkTimeout();
-
-            int cnt = 0;
-
-            while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
-
-                cnt++;
-
-                try {
-                    boolean preloadFinished = true;
-
-                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                        preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
-
-                        if (!preloadFinished)
-                            break;
-                    }
-
-                    // If not first preloading and no more topology events present.
-                    if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
-                        timeout = cctx.gridConfig().getNetworkTimeout();
-
-                    // After workers line up and before preloading starts we initialize all futures.
-                    if (log.isDebugEnabled()) {
-                        Collection<IgniteInternalFuture> unfinished = new HashSet<>();
-
-                        for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
-                            if (!fut.isDone())
-                                unfinished.add(fut);
-                        }
-
-                        log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
-                    }
-
-                    // Take next exchange future.
-                    exchFut = poll(futQ, timeout, this);
-
-                    if (exchFut == null)
-                        continue; // Main while loop.
-
-                    busy = true;
-
-                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
-                    boolean dummyReassign = exchFut.dummyReassign();
-                    boolean forcePreload = exchFut.forcePreload();
-
-                    try {
-                        if (isCancelled())
-                            break;
-
-                        if (!exchFut.dummy() && !exchFut.forcePreload()) {
-                            lastInitializedFut = exchFut;
-
-                            exchFut.init();
-
-                            int dumpedObjects = 0;
-
-                            while (true) {
-                                try {
-                                    exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
-                                    break;
-                                }
-                                catch (IgniteFutureTimeoutCheckedException ignored) {
-                                    U.warn(log, "Failed to wait for partition map exchange [" +
-                                        "topVer=" + exchFut.topologyVersion() +
-                                        ", node=" + cctx.localNodeId() + "]. " +
-                                        "Dumping pending objects that might be the cause: ");
-
-                                    if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
-                                        try {
-                                            dumpDebugInfo(exchFut.topologyVersion());
-                                        }
-                                        catch (Exception e) {
-                                            U.error(log, "Failed to dump debug information: " + e, e);
-                                        }
-
-                                        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
-                                            U.dumpThreads(log);
-
-                                        dumpedObjects++;
-                                    }
-                                }
-                            }
-
-
-                            if (log.isDebugEnabled())
-                                log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
-                                    this + ']');
-
-                            if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
-                                lastRefresh.compareAndSet(-1, U.currentTimeMillis());
-
-                            boolean changed = false;
-
-                            // Just pick first worker to do this, so we don't
-                            // invoke topology callback more than once for the
-                            // same event.
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                if (cacheCtx.isLocal())
-                                    continue;
-
-                                changed |= cacheCtx.topology().afterExchange(exchFut);
-                            }
-
-                            if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
-                                refreshPartitions();
-                        }
-                        else {
-                            if (log.isDebugEnabled())
-                                log.debug("Got dummy exchange (will reassign)");
-
-                            if (!dummyReassign) {
-                                timeout = 0; // Force refresh.
-
-                                continue;
-                            }
-                        }
-
-                        if (!exchFut.skipPreload()) {
-                            assignsMap = new HashMap<>();
-
-                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                                long delay = cacheCtx.config().getRebalanceDelay();
-
-                                GridDhtPreloaderAssignments assigns = null;
-
-                                // Don't delay for dummy reassigns to avoid infinite recursion.
-                                if (delay == 0 || forcePreload)
-                                    assigns = cacheCtx.preloader().assign(exchFut);
-
-                                assignsMap.put(cacheCtx.cacheId(), assigns);
-                            }
-                        }
-                    }
-                    finally {
-                        // Must flip busy flag before assignments are given to demand workers.
-                        busy = false;
-                    }
-
-                    if (assignsMap != null) {
-                        int size = assignsMap.size();
-
-                        NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
-
-                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
-                            int cacheId = e.getKey();
-
-                            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-                            int order = cacheCtx.config().getRebalanceOrder();
-
-                            if (orderMap.get(order) == null)
-                                orderMap.put(order, new ArrayList<Integer>(size));
-
-                            orderMap.get(order).add(cacheId);
-                        }
-
-                        Runnable r = null;
-
-                        List<String> rebList = new LinkedList<>();
-
-                        boolean assignsCancelled = false;
-
-                        for (Integer order : orderMap.descendingKeySet()) {
-                            for (Integer cacheId : orderMap.get(order)) {
-                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-                                GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
-
-                                if (assigns != null)
-                                    assignsCancelled |= assigns.cancelled();
-
-                                // Cancels previous rebalance future (in case it's not done yet).
-                                // Sends previous rebalance stopped event (if necessary).
-                                // Creates new rebalance future.
-                                // Sends current rebalance started event (if necessary).
-                                // Finishes cache sync future (on empty assignments).
-                                Runnable cur = cacheCtx.preloader().addAssignments(assigns,
-                                    forcePreload,
-                                    cnt,
-                                    r,
-                                    exchFut.forcedRebalanceFuture());
-
-                                if (cur != null) {
-                                    rebList.add(U.maskName(cacheCtx.name()));
-
-                                    r = cur;
-                                }
-                            }
-                        }
-
-                        if (assignsCancelled) { // Pending exchange.
-                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                        }
-                        else if (r != null) {
-                            Collections.reverse(rebList);
-
-                            U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
-
-                            if (futQ.isEmpty()) {
-                                U.log(log, "Rebalancing started " +
-                                    "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-
-                                r.run(); // Starts rebalancing routine.
-                            }
-                            else
-                                U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                    "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                    ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                        }
-                        else
-                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                }
-                catch (IgniteInterruptedCheckedException e) {
-                    throw e;
-                }
-                catch (IgniteClientDisconnectedCheckedException ignored) {
-                    return;
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to wait for completion of partition map exchange " +
-                        "(preloading will not start): " + exchFut, e);
-                }
-            }
-        }
-    }
-
-    /**
      * Partition resend timeout object.
      */
     private class ResendTimeoutObject implements GridTimeoutObject {


[6/6] ignite git commit: Merge branch 'ignite-4834' into ignite-4565-ddl

Posted by vo...@apache.org.
Merge branch 'ignite-4834' into ignite-4565-ddl


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4538526d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4538526d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4538526d

Branch: refs/heads/ignite-4565-ddl
Commit: 4538526d87d7b3e8c991470635b5911dba4cfa74
Parents: 70a1191 deeee8c
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:50:43 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:50:43 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorkerTask.java |  29 ++++
 .../GridCachePartitionExchangeManager.java      | 173 +++++++++++++------
 .../processors/cache/GridCacheProcessor.java    |  19 ++
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 4 files changed, 177 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4538526d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------


[5/6] ignite git commit: Done.

Posted by vo...@apache.org.
Done.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb

Branch: refs/heads/ignite-4565-ddl
Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43
Parents: 19381da
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:02:15 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:02:15 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 25 ++++++++++++++------
 .../processors/cache/GridCacheProcessor.java    | 10 ++++++++
 2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1ce8cfe..b4604e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     exchFut = exchangeFuture(exchId, evt, cache,null, null);
                 }
                 else {
-                    DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+                    DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
 
-                    if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
-                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+                    if (customMsg instanceof DynamicCacheChangeBatch) {
+                        DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
 
                         Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
 
@@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                             exchFut = exchangeFuture(exchId, evt, cache, valid, null);
                         }
                     }
-                    else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
-                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+                    else if (customMsg instanceof CacheAffinityChangeMessage) {
+                        CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
 
                         if (msg.exchangeId() == null) {
                             if (msg.exchangeNeeded()) {
@@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                                 exchFut = exchangeFuture(exchId, evt, cache, null, msg);
                             }
                         }
-                        else
-                            exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+                        else {
+                            exchangeFuture(msg.exchangeId(), null, null, null, null)
+                                .onAffinityChangeMessage(evt.eventNode(), msg);
+                        }
+                    }
+                    else {
+                        // Process event as custom discovery task if needed.
+                        CachePartitionExchangeWorkerTask task =
+                            cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+                        if (task != null)
+                            exchWorker.addCustomTask(task);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 459cf3a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Create exchange worker task for custom discovery message.
+     *
+     * @param msg Custom discovery message.
+     * @return Task or {@code null} if message doesn't require any special processing.
+     */
+    public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+        return null;
+    }
+
+    /**
      * Process custom exchange task.
      *
      * @param task Task.


[4/6] ignite git commit: Done.

Posted by vo...@apache.org.
Done.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19381da3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19381da3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19381da3

Branch: refs/heads/ignite-4565-ddl
Commit: 19381da3961cbd2fee4f36c2b08638d507b0741c
Parents: e2f382b
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:41:35 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:41:35 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCachePartitionExchangeManager.java     | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19381da3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 26bc27d..1ce8cfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -687,6 +687,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Add custom task.
+     *
+     * @param task Task.
+     */
+    public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+        assert !task.isExchange();
+
+        exchWorker.addCustomTask(task);
+    }
+
+    /**
      * @param evt Discovery event.
      * @return Affinity topology version.
      */
@@ -818,8 +829,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         @Nullable GridCacheVersion lastVer,
         boolean compress) {
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
-            lastVer,
-            exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+                lastVer,
+                exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
 
         boolean useOldApi = false;
 


[2/6] ignite git commit: WIP.

Posted by vo...@apache.org.
WIP.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92524a44
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92524a44
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92524a44

Branch: refs/heads/ignite-4565-ddl
Commit: 92524a44eb8ccfb43901557f23368fc325e46c59
Parents: 23f67a5
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:04:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:04:37 2017 +0300

----------------------------------------------------------------------
 .../cache/CachePartitionExchangeWorker.java     | 355 -------------------
 .../cache/CachePartitionExchangeWorkerTask.java |  29 ++
 .../GridCachePartitionExchangeManager.java      | 335 ++++++++++++++++-
 .../GridDhtPartitionsExchangeFuture.java        |   8 +-
 4 files changed, 357 insertions(+), 370 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
deleted file mode 100644
index 98a9cc0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
-
-/**
- * Exchange future thread. All exchanges happen only by one thread and next
- * exchange will not start until previous one completes.
- */
-public class CachePartitionExchangeWorker<K, V> extends GridWorker {
-    /** Cache context. */
-    private final GridCacheSharedContext<K, V> cctx;
-
-    /** Exchange manager. */
-    private final GridCachePartitionExchangeManager<K, V> exchMgr;
-
-    /** Future queue. */
-    private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
-        new LinkedBlockingDeque<>();
-
-    /** Busy flag used as performance optimization to stop current preloading. */
-    private volatile boolean busy;
-
-    /**
-     * Constructor.
-     *
-     * @param exchMgr Exchange manager.
-     * @param log Logger.
-     */
-    public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) {
-        super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log);
-
-        this.cctx = exchMgr.context();
-
-        this.exchMgr = exchMgr;
-    }
-
-    /**
-     * Add first exchange future.
-     *
-     * @param fut Future.
-     */
-    public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
-        futQ.addFirst(fut);
-    }
-
-    /**
-     * @param exchFut Exchange future.
-     */
-    void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
-        assert exchFut != null;
-
-        if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
-            futQ.offer(exchFut);
-
-        if (log.isDebugEnabled())
-            log.debug("Added exchange future to exchange worker: " + exchFut);
-    }
-
-    /**
-     * Dump debug info.
-     */
-    public void dumpFuturesDebugInfo() {
-        U.warn(log, "Pending exchange futures:");
-
-        for (GridDhtPartitionsExchangeFuture fut : futQ)
-            U.warn(log, ">>> " + fut);
-    }
-
-    /**
-     * @return {@code True} iif exchange queue is empty.
-     */
-    public boolean exchangeQueueIsEmpty() {
-        return futQ.isEmpty();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
-        long timeout = cctx.gridConfig().getNetworkTimeout();
-
-        int cnt = 0;
-
-        while (!isCancelled()) {
-            GridDhtPartitionsExchangeFuture exchFut = null;
-
-            cnt++;
-
-            try {
-                boolean preloadFinished = true;
-
-                for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                    preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
-
-                    if (!preloadFinished)
-                        break;
-                }
-
-                // If not first preloading and no more topology events present.
-                if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished)
-                    timeout = cctx.gridConfig().getNetworkTimeout();
-
-                // After workers line up and before preloading starts we initialize all futures.
-                if (log.isDebugEnabled()) {
-                    Collection<IgniteInternalFuture> unfinished = new HashSet<>();
-
-                    for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) {
-                        if (!fut.isDone())
-                            unfinished.add(fut);
-                    }
-
-                    log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
-                }
-
-                // Take next exchange future.
-                if (isCancelled())
-                    Thread.currentThread().interrupt();
-
-                exchFut = futQ.poll(timeout, MILLISECONDS);
-
-                if (exchFut == null)
-                    continue; // Main while loop.
-
-                busy = true;
-
-                Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
-                boolean dummyReassign = exchFut.dummyReassign();
-                boolean forcePreload = exchFut.forcePreload();
-
-                try {
-                    if (isCancelled())
-                        break;
-
-                    if (!exchFut.dummy() && !exchFut.forcePreload()) {
-                        exchMgr.lastTopologyFuture(exchFut);
-
-                        exchFut.init();
-
-                        int dumpedObjects = 0;
-
-                        while (true) {
-                            try {
-                                exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
-                                break;
-                            }
-                            catch (IgniteFutureTimeoutCheckedException ignored) {
-                                U.warn(log, "Failed to wait for partition map exchange [" +
-                                    "topVer=" + exchFut.topologyVersion() +
-                                    ", node=" + cctx.localNodeId() + "]. " +
-                                    "Dumping pending objects that might be the cause: ");
-
-                                if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
-                                    try {
-                                        exchMgr.dumpDebugInfo(exchFut.topologyVersion());
-                                    }
-                                    catch (Exception e) {
-                                        U.error(log, "Failed to dump debug information: " + e, e);
-                                    }
-
-                                    if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
-                                        U.dumpThreads(log);
-
-                                    dumpedObjects++;
-                                }
-                            }
-                        }
-
-
-                        if (log.isDebugEnabled())
-                            log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
-                                this + ']');
-
-                        boolean changed = false;
-
-                        // Just pick first worker to do this, so we don't
-                        // invoke topology callback more than once for the
-                        // same event.
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                            if (cacheCtx.isLocal())
-                                continue;
-
-                            changed |= cacheCtx.topology().afterExchange(exchFut);
-                        }
-
-                        if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty())
-                            exchMgr.refreshPartitions();
-                    }
-                    else {
-                        if (log.isDebugEnabled())
-                            log.debug("Got dummy exchange (will reassign)");
-
-                        if (!dummyReassign) {
-                            timeout = 0; // Force refresh.
-
-                            continue;
-                        }
-                    }
-
-                    if (!exchFut.skipPreload()) {
-                        assignsMap = new HashMap<>();
-
-                        for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
-                            long delay = cacheCtx.config().getRebalanceDelay();
-
-                            GridDhtPreloaderAssignments assigns = null;
-
-                            // Don't delay for dummy reassigns to avoid infinite recursion.
-                            if (delay == 0 || forcePreload)
-                                assigns = cacheCtx.preloader().assign(exchFut);
-
-                            assignsMap.put(cacheCtx.cacheId(), assigns);
-                        }
-                    }
-                }
-                finally {
-                    // Must flip busy flag before assignments are given to demand workers.
-                    busy = false;
-                }
-
-                if (assignsMap != null) {
-                    int size = assignsMap.size();
-
-                    NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
-
-                    for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
-                        int cacheId = e.getKey();
-
-                        GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-                        int order = cacheCtx.config().getRebalanceOrder();
-
-                        if (orderMap.get(order) == null)
-                            orderMap.put(order, new ArrayList<Integer>(size));
-
-                        orderMap.get(order).add(cacheId);
-                    }
-
-                    Runnable r = null;
-
-                    List<String> rebList = new LinkedList<>();
-
-                    boolean assignsCancelled = false;
-
-                    for (Integer order : orderMap.descendingKeySet()) {
-                        for (Integer cacheId : orderMap.get(order)) {
-                            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-                            GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
-
-                            if (assigns != null)
-                                assignsCancelled |= assigns.cancelled();
-
-                            // Cancels previous rebalance future (in case it's not done yet).
-                            // Sends previous rebalance stopped event (if necessary).
-                            // Creates new rebalance future.
-                            // Sends current rebalance started event (if necessary).
-                            // Finishes cache sync future (on empty assignments).
-                            Runnable cur = cacheCtx.preloader().addAssignments(assigns,
-                                forcePreload,
-                                cnt,
-                                r,
-                                exchFut.forcedRebalanceFuture());
-
-                            if (cur != null) {
-                                rebList.add(U.maskName(cacheCtx.name()));
-
-                                r = cur;
-                            }
-                        }
-                    }
-
-                    if (assignsCancelled) { // Pending exchange.
-                        U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                            "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                            ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                    else if (r != null) {
-                        Collections.reverse(rebList);
-
-                        U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
-
-                        if (exchangeQueueIsEmpty()) {
-                            U.log(log, "Rebalancing started " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-
-                            r.run(); // Starts rebalancing routine.
-                        }
-                        else
-                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
-                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                    }
-                    else
-                        U.log(log, "Skipping rebalancing (nothing scheduled) " +
-                            "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
-                            ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-                }
-            }
-            catch (IgniteInterruptedCheckedException e) {
-                throw e;
-            }
-            catch (IgniteClientDisconnectedCheckedException ignored) {
-                return;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to wait for completion of partition map exchange " +
-                    "(preloading will not start): " + exchFut, e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+    /**
+     * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which
+     * must be executed from within exchange thread.
+     */
+    boolean isExchange();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f9222bc..444b530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,15 +21,24 @@ import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -80,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lang.IgniteUuid;
@@ -87,6 +98,7 @@ import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
 import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -114,9 +126,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /** */
     private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
 
+    /** Last partition refresh. */
+    private final AtomicLong lastRefresh = new AtomicLong(-1);
+
     /** */
     @GridToStringInclude
-    private CachePartitionExchangeWorker exchWorker;
+    private ExchangeWorker exchWorker;
 
     /** */
     @GridToStringExclude
@@ -282,7 +297,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     @Override protected void start0() throws IgniteCheckedException {
         super.start0();
 
-        exchWorker = new CachePartitionExchangeWorker<>(this, log);
+        exchWorker = new ExchangeWorker();
 
         cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
             EVT_DISCOVERY_CUSTOM_EVT);
@@ -582,13 +597,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
-     * @param lastInitializedFut Last completed topology future.
-     */
-    public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) {
-        this.lastInitializedFut = lastInitializedFut;
-    }
-
-    /**
      * @param ver Topology version.
      * @return Future or {@code null} is future is already completed.
      */
@@ -676,7 +684,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      * @return {@code True} if pending future queue is empty.
      */
     public boolean hasPendingExchange() {
-        return !exchWorker.exchangeQueueIsEmpty();
+        return exchWorker.hasPendingExchange();
     }
 
     /**
@@ -731,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     /**
      * Partition refresh callback.
      */
-    public void refreshPartitions() {
+    private void refreshPartitions() {
         ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
 
         if (oldest == null) {
@@ -811,8 +819,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         @Nullable GridCacheVersion lastVer,
         boolean compress) {
         GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
-                lastVer,
-                exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+            lastVer,
+            exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
 
         boolean useOldApi = false;
 
@@ -1337,7 +1345,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         U.warn(log, "Last exchange future: " + lastInitializedFut);
 
-        exchWorker.dumpFuturesDebugInfo();
+        exchWorker.dumpExchangeDebugInfo();
 
         if (!readyFuts.isEmpty()) {
             U.warn(log, "Pending affinity ready futures:");
@@ -1554,6 +1562,305 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     }
 
     /**
+     * Exchange future thread. All exchanges happen only by one thread and next
+     * exchange will not start until previous one completes.
+     */
+    private class ExchangeWorker extends GridWorker {
+        /** Future queue. */
+        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+            new LinkedBlockingDeque<>();
+
+        /** Busy flag used as performance optimization to stop current preloading. */
+        private volatile boolean busy;
+
+        /**
+         * Constructor.
+         */
+        private ExchangeWorker() {
+            super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
+        }
+
+        /**
+         * Add first exchange future.
+         *
+         * @param exchFut Exchange future.
+         */
+        void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            futQ.addFirst(exchFut);
+        }
+
+        /**
+         * @param exchFut Exchange future.
+         */
+        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+            assert exchFut != null;
+
+            if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
+                futQ.offer(exchFut);
+
+            if (log.isDebugEnabled())
+                log.debug("Added exchange future to exchange worker: " + exchFut);
+        }
+
+        /**
+         * @return Whether pending exchange future exists.
+         */
+        boolean hasPendingExchange() {
+            return !futQ.isEmpty();
+        }
+
+        /**
+         * Dump debug info.
+         */
+        void dumpExchangeDebugInfo() {
+            U.warn(log, "Pending exchange futures:");
+
+            for (GridDhtPartitionsExchangeFuture fut : futQ)
+                U.warn(log, ">>> " + fut);
+        }
+
+        /** {@inheritDoc} */
+        @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+            long timeout = cctx.gridConfig().getNetworkTimeout();
+
+            int cnt = 0;
+
+            while (!isCancelled()) {
+                GridDhtPartitionsExchangeFuture exchFut = null;
+
+                cnt++;
+
+                try {
+                    boolean preloadFinished = true;
+
+                    for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                        preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
+
+                        if (!preloadFinished)
+                            break;
+                    }
+
+                    // If not first preloading and no more topology events present.
+                    if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished)
+                        timeout = cctx.gridConfig().getNetworkTimeout();
+
+                    // After workers line up and before preloading starts we initialize all futures.
+                    if (log.isDebugEnabled()) {
+                        Collection<IgniteInternalFuture> unfinished = new HashSet<>();
+
+                        for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
+                            if (!fut.isDone())
+                                unfinished.add(fut);
+                        }
+
+                        log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
+                    }
+
+                    // Take next exchange future.
+                    if (isCancelled())
+                        Thread.currentThread().interrupt();
+
+                    exchFut = futQ.poll(timeout, MILLISECONDS);
+
+                    if (exchFut == null)
+                        continue; // Main while loop.
+
+                    busy = true;
+
+                    Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
+
+                    boolean dummyReassign = exchFut.dummyReassign();
+                    boolean forcePreload = exchFut.forcePreload();
+
+                    try {
+                        if (isCancelled())
+                            break;
+
+                        if (!exchFut.dummy() && !exchFut.forcePreload()) {
+                            lastInitializedFut = exchFut;
+
+                            exchFut.init();
+
+                            int dumpedObjects = 0;
+
+                            while (true) {
+                                try {
+                                    exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+                                    break;
+                                }
+                                catch (IgniteFutureTimeoutCheckedException ignored) {
+                                    U.warn(log, "Failed to wait for partition map exchange [" +
+                                        "topVer=" + exchFut.topologyVersion() +
+                                        ", node=" + cctx.localNodeId() + "]. " +
+                                        "Dumping pending objects that might be the cause: ");
+
+                                    if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+                                        try {
+                                            dumpDebugInfo(exchFut.topologyVersion());
+                                        }
+                                        catch (Exception e) {
+                                            U.error(log, "Failed to dump debug information: " + e, e);
+                                        }
+
+                                        if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+                                            U.dumpThreads(log);
+
+                                        dumpedObjects++;
+                                    }
+                                }
+                            }
+
+
+                            if (log.isDebugEnabled())
+                                log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+                                    this + ']');
+
+                            if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
+                                lastRefresh.compareAndSet(-1, U.currentTimeMillis());
+
+                            boolean changed = false;
+
+                            // Just pick first worker to do this, so we don't
+                            // invoke topology callback more than once for the
+                            // same event.
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                if (cacheCtx.isLocal())
+                                    continue;
+
+                                changed |= cacheCtx.topology().afterExchange(exchFut);
+                            }
+
+                            if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
+                                refreshPartitions();
+                        }
+                        else {
+                            if (log.isDebugEnabled())
+                                log.debug("Got dummy exchange (will reassign)");
+
+                            if (!dummyReassign) {
+                                timeout = 0; // Force refresh.
+
+                                continue;
+                            }
+                        }
+
+                        if (!exchFut.skipPreload()) {
+                            assignsMap = new HashMap<>();
+
+                            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                                long delay = cacheCtx.config().getRebalanceDelay();
+
+                                GridDhtPreloaderAssignments assigns = null;
+
+                                // Don't delay for dummy reassigns to avoid infinite recursion.
+                                if (delay == 0 || forcePreload)
+                                    assigns = cacheCtx.preloader().assign(exchFut);
+
+                                assignsMap.put(cacheCtx.cacheId(), assigns);
+                            }
+                        }
+                    }
+                    finally {
+                        // Must flip busy flag before assignments are given to demand workers.
+                        busy = false;
+                    }
+
+                    if (assignsMap != null) {
+                        int size = assignsMap.size();
+
+                        NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
+                        for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
+                            int cacheId = e.getKey();
+
+                            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                            int order = cacheCtx.config().getRebalanceOrder();
+
+                            if (orderMap.get(order) == null)
+                                orderMap.put(order, new ArrayList<Integer>(size));
+
+                            orderMap.get(order).add(cacheId);
+                        }
+
+                        Runnable r = null;
+
+                        List<String> rebList = new LinkedList<>();
+
+                        boolean assignsCancelled = false;
+
+                        for (Integer order : orderMap.descendingKeySet()) {
+                            for (Integer cacheId : orderMap.get(order)) {
+                                GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+                                GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+
+                                if (assigns != null)
+                                    assignsCancelled |= assigns.cancelled();
+
+                                // Cancels previous rebalance future (in case it's not done yet).
+                                // Sends previous rebalance stopped event (if necessary).
+                                // Creates new rebalance future.
+                                // Sends current rebalance started event (if necessary).
+                                // Finishes cache sync future (on empty assignments).
+                                Runnable cur = cacheCtx.preloader().addAssignments(assigns,
+                                    forcePreload,
+                                    cnt,
+                                    r,
+                                    exchFut.forcedRebalanceFuture());
+
+                                if (cur != null) {
+                                    rebList.add(U.maskName(cacheCtx.name()));
+
+                                    r = cur;
+                                }
+                            }
+                        }
+
+                        if (assignsCancelled) { // Pending exchange.
+                            U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else if (r != null) {
+                            Collections.reverse(rebList);
+
+                            U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+
+                            if (!hasPendingExchange()) {
+                                U.log(log, "Rebalancing started " +
+                                    "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+
+                                r.run(); // Starts rebalancing routine.
+                            }
+                            else
+                                U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+                                    "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                    ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                        }
+                        else
+                            U.log(log, "Skipping rebalancing (nothing scheduled) " +
+                                "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+                                ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+                    }
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    throw e;
+                }
+                catch (IgniteClientDisconnectedCheckedException ignored) {
+                    return;
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to wait for completion of partition map exchange " +
+                        "(preloading will not start): " + exchFut, e);
+                }
+            }
+        }
+    }
+
+    /**
      * Partition resend timeout object.
      */
     private class ResendTimeoutObject implements GridTimeoutObject {

http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
 import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
  * Future for exchanging partition maps.
  */
 public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
-    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+    implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
     /** */
     public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
         IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
     }
 
     /** {@inheritDoc} */
+    @Override public boolean isExchange() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
     @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
         return exchId.compareTo(fut.exchId);
     }


[3/6] ignite git commit: Done.

Posted by vo...@apache.org.
Done.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2f382be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2f382be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2f382be

Branch: refs/heads/ignite-4565-ddl
Commit: e2f382bed0dc0a1c906fdb06f35c9e93988e9fc8
Parents: 92524a4
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:38:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:38:37 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 77 ++++++++++++++++----
 .../processors/cache/GridCacheProcessor.java    |  9 +++
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 444b530..26bc27d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -369,7 +368,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.addFirstFuture(fut);
+        exchWorker.addFirstExchangeFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -704,7 +703,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     public void forceDummyExchange(boolean reassign,
         GridDhtPartitionsExchangeFuture exchFut) {
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
     }
 
@@ -716,7 +715,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
         GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
 
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
 
         return fut;
@@ -1192,7 +1191,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
         if (fut.onAdded()) {
-            exchWorker.addFuture(fut);
+            exchWorker.addExchangeFuture(fut);
 
             return true;
         }
@@ -1567,7 +1566,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private class ExchangeWorker extends GridWorker {
         /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+        private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
             new LinkedBlockingDeque<>();
 
         /** Busy flag used as performance optimization to stop current preloading. */
@@ -1585,14 +1584,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
          *
          * @param exchFut Exchange future.
          */
-        void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             futQ.addFirst(exchFut);
         }
 
         /**
          * @param exchFut Exchange future.
          */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
             if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
@@ -1603,10 +1602,44 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         /**
+         * Add custom exchange task.
+         *
+         * @param task Task.
+         */
+        void addCustomTask(CachePartitionExchangeWorkerTask task) {
+            assert task != null;
+
+            assert !task.isExchange();
+
+            futQ.offer(task);
+        }
+
+        /**
+         * Process custom exchange task.
+         *
+         * @param task Task.
+         */
+        void processCustomTask(CachePartitionExchangeWorkerTask task) {
+            try {
+                cctx.cache().processCustomExchangeTask(task);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to process custom exchange task: " + task, e);
+            }
+        }
+
+        /**
          * @return Whether pending exchange future exists.
          */
         boolean hasPendingExchange() {
-            return !futQ.isEmpty();
+            if (!futQ.isEmpty()) {
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task.isExchange())
+                        return true;
+                }
+            }
+
+            return false;
         }
 
         /**
@@ -1615,8 +1648,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         void dumpExchangeDebugInfo() {
             U.warn(log, "Pending exchange futures:");
 
-            for (GridDhtPartitionsExchangeFuture fut : futQ)
-                U.warn(log, ">>> " + fut);
+            for (CachePartitionExchangeWorkerTask task: futQ) {
+                if (task.isExchange())
+                    U.warn(log, ">>> " + task);
+            }
         }
 
         /** {@inheritDoc} */
@@ -1626,7 +1661,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int cnt = 0;
 
             while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
+                CachePartitionExchangeWorkerTask task = null;
 
                 cnt++;
 
@@ -1660,10 +1695,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (isCancelled())
                         Thread.currentThread().interrupt();
 
-                    exchFut = futQ.poll(timeout, MILLISECONDS);
+                    task = futQ.poll(timeout, MILLISECONDS);
+
+                    if (task == null)
+                        continue;
+
+                    if (!task.isExchange()) {
+                        processCustomTask(task);
+
+                        continue;
+                    }
+
+                    assert task instanceof GridDhtPartitionsExchangeFuture;
 
-                    if (exchFut == null)
-                        continue; // Main while loop.
+                    GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
 
                     busy = true;
 
@@ -1854,7 +1899,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to wait for completion of partition map exchange " +
-                        "(preloading will not start): " + exchFut, e);
+                        "(preloading will not start): " + task, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..459cf3a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Process custom exchange task.
+     *
+     * @param task Task.
+     */
+    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+        // No-op.
+    }
+
+    /**
      * @param c Ignite configuration.
      * @param cc Configuration to validate.
      * @param cacheType Cache type.