You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:03 UTC

[3/6] ignite git commit: Done.

Done.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2f382be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2f382be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2f382be

Branch: refs/heads/ignite-4565-ddl
Commit: e2f382bed0dc0a1c906fdb06f35c9e93988e9fc8
Parents: 92524a4
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:38:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:38:37 2017 +0300

----------------------------------------------------------------------
 .../GridCachePartitionExchangeManager.java      | 77 ++++++++++++++++----
 .../processors/cache/GridCacheProcessor.java    |  9 +++
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 444b530..26bc27d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.LinkedBlockingDeque;
@@ -369,7 +368,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         if (reconnect)
             reconnectExchangeFut = new GridFutureAdapter<>();
 
-        exchWorker.addFirstFuture(fut);
+        exchWorker.addFirstExchangeFuture(fut);
 
         if (!cctx.kernalContext().clientNode()) {
             for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -704,7 +703,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     public void forceDummyExchange(boolean reassign,
         GridDhtPartitionsExchangeFuture exchFut) {
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
     }
 
@@ -716,7 +715,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
     public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
         GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
 
-        exchWorker.addFuture(
+        exchWorker.addExchangeFuture(
             new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
 
         return fut;
@@ -1192,7 +1191,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
         if (fut.onAdded()) {
-            exchWorker.addFuture(fut);
+            exchWorker.addExchangeFuture(fut);
 
             return true;
         }
@@ -1567,7 +1566,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
      */
     private class ExchangeWorker extends GridWorker {
         /** Future queue. */
-        private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+        private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
             new LinkedBlockingDeque<>();
 
         /** Busy flag used as performance optimization to stop current preloading. */
@@ -1585,14 +1584,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
          *
          * @param exchFut Exchange future.
          */
-        void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             futQ.addFirst(exchFut);
         }
 
         /**
          * @param exchFut Exchange future.
          */
-        void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+        void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
             assert exchFut != null;
 
             if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
@@ -1603,10 +1602,44 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         }
 
         /**
+         * Add custom exchange task.
+         *
+         * @param task Task.
+         */
+        void addCustomTask(CachePartitionExchangeWorkerTask task) {
+            assert task != null;
+
+            assert !task.isExchange();
+
+            futQ.offer(task);
+        }
+
+        /**
+         * Process custom exchange task.
+         *
+         * @param task Task.
+         */
+        void processCustomTask(CachePartitionExchangeWorkerTask task) {
+            try {
+                cctx.cache().processCustomExchangeTask(task);
+            }
+            catch (Exception e) {
+                U.warn(log, "Failed to process custom exchange task: " + task, e);
+            }
+        }
+
+        /**
          * @return Whether pending exchange future exists.
          */
         boolean hasPendingExchange() {
-            return !futQ.isEmpty();
+            if (!futQ.isEmpty()) {
+                for (CachePartitionExchangeWorkerTask task : futQ) {
+                    if (task.isExchange())
+                        return true;
+                }
+            }
+
+            return false;
         }
 
         /**
@@ -1615,8 +1648,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
         void dumpExchangeDebugInfo() {
             U.warn(log, "Pending exchange futures:");
 
-            for (GridDhtPartitionsExchangeFuture fut : futQ)
-                U.warn(log, ">>> " + fut);
+            for (CachePartitionExchangeWorkerTask task: futQ) {
+                if (task.isExchange())
+                    U.warn(log, ">>> " + task);
+            }
         }
 
         /** {@inheritDoc} */
@@ -1626,7 +1661,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
             int cnt = 0;
 
             while (!isCancelled()) {
-                GridDhtPartitionsExchangeFuture exchFut = null;
+                CachePartitionExchangeWorkerTask task = null;
 
                 cnt++;
 
@@ -1660,10 +1695,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                     if (isCancelled())
                         Thread.currentThread().interrupt();
 
-                    exchFut = futQ.poll(timeout, MILLISECONDS);
+                    task = futQ.poll(timeout, MILLISECONDS);
+
+                    if (task == null)
+                        continue;
+
+                    if (!task.isExchange()) {
+                        processCustomTask(task);
+
+                        continue;
+                    }
+
+                    assert task instanceof GridDhtPartitionsExchangeFuture;
 
-                    if (exchFut == null)
-                        continue; // Main while loop.
+                    GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
 
                     busy = true;
 
@@ -1854,7 +1899,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
                 }
                 catch (IgniteCheckedException e) {
                     U.error(log, "Failed to wait for completion of partition map exchange " +
-                        "(preloading will not start): " + exchFut, e);
+                        "(preloading will not start): " + task, e);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..459cf3a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Process custom exchange task.
+     *
+     * @param task Task.
+     */
+    public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+        // No-op.
+    }
+
+    /**
      * @param c Ignite configuration.
      * @param cc Configuration to validate.
      * @param cacheType Cache type.