You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:03 UTC
[3/6] ignite git commit: Done.
Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2f382be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2f382be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2f382be
Branch: refs/heads/ignite-4565-ddl
Commit: e2f382bed0dc0a1c906fdb06f35c9e93988e9fc8
Parents: 92524a4
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:38:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:38:37 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 77 ++++++++++++++++----
.../processors/cache/GridCacheProcessor.java | 9 +++
2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 444b530..26bc27d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -369,7 +368,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- exchWorker.addFirstFuture(fut);
+ exchWorker.addFirstExchangeFuture(fut);
if (!cctx.kernalContext().clientNode()) {
for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -704,7 +703,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
public void forceDummyExchange(boolean reassign,
GridDhtPartitionsExchangeFuture exchFut) {
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
}
@@ -716,7 +715,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
return fut;
@@ -1192,7 +1191,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
if (fut.onAdded()) {
- exchWorker.addFuture(fut);
+ exchWorker.addExchangeFuture(fut);
return true;
}
@@ -1567,7 +1566,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private class ExchangeWorker extends GridWorker {
/** Future queue. */
- private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+ private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
new LinkedBlockingDeque<>();
/** Busy flag used as performance optimization to stop current preloading. */
@@ -1585,14 +1584,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*
* @param exchFut Exchange future.
*/
- void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
futQ.addFirst(exchFut);
}
/**
* @param exchFut Exchange future.
*/
- void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut != null;
if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
@@ -1603,10 +1602,44 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Add custom exchange task.
+ *
+ * @param task Task.
+ */
+ void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ assert task != null;
+
+ assert !task.isExchange();
+
+ futQ.offer(task);
+ }
+
+ /**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ void processCustomTask(CachePartitionExchangeWorkerTask task) {
+ try {
+ cctx.cache().processCustomExchangeTask(task);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to process custom exchange task: " + task, e);
+ }
+ }
+
+ /**
* @return Whether pending exchange future exists.
*/
boolean hasPendingExchange() {
- return !futQ.isEmpty();
+ if (!futQ.isEmpty()) {
+ for (CachePartitionExchangeWorkerTask task : futQ) {
+ if (task.isExchange())
+ return true;
+ }
+ }
+
+ return false;
}
/**
@@ -1615,8 +1648,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
void dumpExchangeDebugInfo() {
U.warn(log, "Pending exchange futures:");
- for (GridDhtPartitionsExchangeFuture fut : futQ)
- U.warn(log, ">>> " + fut);
+ for (CachePartitionExchangeWorkerTask task: futQ) {
+ if (task.isExchange())
+ U.warn(log, ">>> " + task);
+ }
}
/** {@inheritDoc} */
@@ -1626,7 +1661,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int cnt = 0;
while (!isCancelled()) {
- GridDhtPartitionsExchangeFuture exchFut = null;
+ CachePartitionExchangeWorkerTask task = null;
cnt++;
@@ -1660,10 +1695,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (isCancelled())
Thread.currentThread().interrupt();
- exchFut = futQ.poll(timeout, MILLISECONDS);
+ task = futQ.poll(timeout, MILLISECONDS);
+
+ if (task == null)
+ continue;
+
+ if (!task.isExchange()) {
+ processCustomTask(task);
+
+ continue;
+ }
+
+ assert task instanceof GridDhtPartitionsExchangeFuture;
- if (exchFut == null)
- continue; // Main while loop.
+ GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
busy = true;
@@ -1854,7 +1899,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
- "(preloading will not start): " + exchFut, e);
+ "(preloading will not start): " + task, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..459cf3a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+ // No-op.
+ }
+
+ /**
* @param c Ignite configuration.
* @param cc Configuration to validate.
* @param cacheType Cache type.