You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/03/17 14:51:01 UTC
[1/6] ignite git commit: Moved exchange worker to separate class.
Repository: ignite
Updated Branches:
refs/heads/ignite-4565-ddl 70a11912b -> 4538526d8
Moved exchange worker to separate class.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/23f67a5a
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/23f67a5a
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/23f67a5a
Branch: refs/heads/ignite-4565-ddl
Commit: 23f67a5a1b8e6c7429d110df858c8bdd17a913d7
Parents: 9020d12
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 15:44:57 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 15:44:57 2017 +0300
----------------------------------------------------------------------
.../cache/CachePartitionExchangeWorker.java | 355 +++++++++++++++++++
.../GridCachePartitionExchangeManager.java | 329 +----------------
2 files changed, 368 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
new file mode 100644
index 0000000..98a9cc0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+
+/**
+ * Exchange future thread. All exchanges happen only by one thread and next
+ * exchange will not start until previous one completes.
+ */
+public class CachePartitionExchangeWorker<K, V> extends GridWorker {
+ /** Cache context. */
+ private final GridCacheSharedContext<K, V> cctx;
+
+ /** Exchange manager. */
+ private final GridCachePartitionExchangeManager<K, V> exchMgr;
+
+ /** Future queue. */
+ private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+ new LinkedBlockingDeque<>();
+
+ /** Busy flag used as performance optimization to stop current preloading. */
+ private volatile boolean busy;
+
+ /**
+ * Constructor.
+ *
+ * @param exchMgr Exchange manager.
+ * @param log Logger.
+ */
+ public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) {
+ super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log);
+
+ this.cctx = exchMgr.context();
+
+ this.exchMgr = exchMgr;
+ }
+
+ /**
+ * Add first exchange future.
+ *
+ * @param fut Future.
+ */
+ public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
+ futQ.addFirst(fut);
+ }
+
+ /**
+ * @param exchFut Exchange future.
+ */
+ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ assert exchFut != null;
+
+ if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
+ futQ.offer(exchFut);
+
+ if (log.isDebugEnabled())
+ log.debug("Added exchange future to exchange worker: " + exchFut);
+ }
+
+ /**
+ * Dump debug info.
+ */
+ public void dumpFuturesDebugInfo() {
+ U.warn(log, "Pending exchange futures:");
+
+ for (GridDhtPartitionsExchangeFuture fut : futQ)
+ U.warn(log, ">>> " + fut);
+ }
+
+ /**
+ * @return {@code True} iif exchange queue is empty.
+ */
+ public boolean exchangeQueueIsEmpty() {
+ return futQ.isEmpty();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ long timeout = cctx.gridConfig().getNetworkTimeout();
+
+ int cnt = 0;
+
+ while (!isCancelled()) {
+ GridDhtPartitionsExchangeFuture exchFut = null;
+
+ cnt++;
+
+ try {
+ boolean preloadFinished = true;
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
+
+ if (!preloadFinished)
+ break;
+ }
+
+ // If not first preloading and no more topology events present.
+ if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished)
+ timeout = cctx.gridConfig().getNetworkTimeout();
+
+ // After workers line up and before preloading starts we initialize all futures.
+ if (log.isDebugEnabled()) {
+ Collection<IgniteInternalFuture> unfinished = new HashSet<>();
+
+ for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) {
+ if (!fut.isDone())
+ unfinished.add(fut);
+ }
+
+ log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
+ }
+
+ // Take next exchange future.
+ if (isCancelled())
+ Thread.currentThread().interrupt();
+
+ exchFut = futQ.poll(timeout, MILLISECONDS);
+
+ if (exchFut == null)
+ continue; // Main while loop.
+
+ busy = true;
+
+ Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
+
+ boolean dummyReassign = exchFut.dummyReassign();
+ boolean forcePreload = exchFut.forcePreload();
+
+ try {
+ if (isCancelled())
+ break;
+
+ if (!exchFut.dummy() && !exchFut.forcePreload()) {
+ exchMgr.lastTopologyFuture(exchFut);
+
+ exchFut.init();
+
+ int dumpedObjects = 0;
+
+ while (true) {
+ try {
+ exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Failed to wait for partition map exchange [" +
+ "topVer=" + exchFut.topologyVersion() +
+ ", node=" + cctx.localNodeId() + "]. " +
+ "Dumping pending objects that might be the cause: ");
+
+ if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+ try {
+ exchMgr.dumpDebugInfo(exchFut.topologyVersion());
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to dump debug information: " + e, e);
+ }
+
+ if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+ U.dumpThreads(log);
+
+ dumpedObjects++;
+ }
+ }
+ }
+
+
+ if (log.isDebugEnabled())
+ log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+ this + ']');
+
+ boolean changed = false;
+
+ // Just pick first worker to do this, so we don't
+ // invoke topology callback more than once for the
+ // same event.
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ changed |= cacheCtx.topology().afterExchange(exchFut);
+ }
+
+ if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty())
+ exchMgr.refreshPartitions();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Got dummy exchange (will reassign)");
+
+ if (!dummyReassign) {
+ timeout = 0; // Force refresh.
+
+ continue;
+ }
+ }
+
+ if (!exchFut.skipPreload()) {
+ assignsMap = new HashMap<>();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ long delay = cacheCtx.config().getRebalanceDelay();
+
+ GridDhtPreloaderAssignments assigns = null;
+
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ if (delay == 0 || forcePreload)
+ assigns = cacheCtx.preloader().assign(exchFut);
+
+ assignsMap.put(cacheCtx.cacheId(), assigns);
+ }
+ }
+ }
+ finally {
+ // Must flip busy flag before assignments are given to demand workers.
+ busy = false;
+ }
+
+ if (assignsMap != null) {
+ int size = assignsMap.size();
+
+ NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
+ for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
+ int cacheId = e.getKey();
+
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ int order = cacheCtx.config().getRebalanceOrder();
+
+ if (orderMap.get(order) == null)
+ orderMap.put(order, new ArrayList<Integer>(size));
+
+ orderMap.get(order).add(cacheId);
+ }
+
+ Runnable r = null;
+
+ List<String> rebList = new LinkedList<>();
+
+ boolean assignsCancelled = false;
+
+ for (Integer order : orderMap.descendingKeySet()) {
+ for (Integer cacheId : orderMap.get(order)) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+
+ if (assigns != null)
+ assignsCancelled |= assigns.cancelled();
+
+ // Cancels previous rebalance future (in case it's not done yet).
+ // Sends previous rebalance stopped event (if necessary).
+ // Creates new rebalance future.
+ // Sends current rebalance started event (if necessary).
+ // Finishes cache sync future (on empty assignments).
+ Runnable cur = cacheCtx.preloader().addAssignments(assigns,
+ forcePreload,
+ cnt,
+ r,
+ exchFut.forcedRebalanceFuture());
+
+ if (cur != null) {
+ rebList.add(U.maskName(cacheCtx.name()));
+
+ r = cur;
+ }
+ }
+ }
+
+ if (assignsCancelled) { // Pending exchange.
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ else if (r != null) {
+ Collections.reverse(rebList);
+
+ U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+
+ if (exchangeQueueIsEmpty()) {
+ U.log(log, "Rebalancing started " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+
+ r.run(); // Starts rebalancing routine.
+ }
+ else
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ else
+ U.log(log, "Skipping rebalancing (nothing scheduled) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteClientDisconnectedCheckedException ignored) {
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to wait for completion of partition map exchange " +
+ "(preloading will not start): " + exchFut, e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/23f67a5a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f9222bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,24 +21,15 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -73,7 +64,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -90,7 +80,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
@@ -98,7 +87,6 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -126,12 +114,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
- /** Last partition refresh. */
- private final AtomicLong lastRefresh = new AtomicLong(-1);
-
/** */
@GridToStringInclude
- private ExchangeWorker exchWorker;
+ private CachePartitionExchangeWorker exchWorker;
/** */
@GridToStringExclude
@@ -297,7 +282,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Override protected void start0() throws IgniteCheckedException {
super.start0();
- exchWorker = new ExchangeWorker();
+ exchWorker = new CachePartitionExchangeWorker<>(this, log);
cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
@@ -369,7 +354,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- exchWorker.futQ.addFirst(fut);
+ exchWorker.addFirstFuture(fut);
if (!cctx.kernalContext().clientNode()) {
for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -597,6 +582,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * @param lastInitializedFut Last completed topology future.
+ */
+ public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) {
+ this.lastInitializedFut = lastInitializedFut;
+ }
+
+ /**
* @param ver Topology version.
* @return Future or {@code null} is future is already completed.
*/
@@ -684,7 +676,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return {@code True} if pending future queue is empty.
*/
public boolean hasPendingExchange() {
- return !exchWorker.futQ.isEmpty();
+ return !exchWorker.exchangeQueueIsEmpty();
}
/**
@@ -739,7 +731,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* Partition refresh callback.
*/
- private void refreshPartitions() {
+ public void refreshPartitions() {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
@@ -1345,10 +1337,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.warn(log, "Last exchange future: " + lastInitializedFut);
- U.warn(log, "Pending exchange futures:");
-
- for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
- U.warn(log, ">>> " + fut);
+ exchWorker.dumpFuturesDebugInfo();
if (!readyFuts.isEmpty()) {
U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1536,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
* @param node Target node.
* @return {@code True} if can use compression for partition map messages.
*/
@@ -1587,276 +1554,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * Exchange future thread. All exchanges happen only by one thread and next
- * exchange will not start until previous one completes.
- */
- private class ExchangeWorker extends GridWorker {
- /** Future queue. */
- private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
- new LinkedBlockingDeque<>();
-
- /** Busy flag used as performance optimization to stop current preloading. */
- private volatile boolean busy;
-
- /**
- *
- */
- private ExchangeWorker() {
- super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
- }
-
- /**
- * @param exchFut Exchange future.
- */
- void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
- assert exchFut != null;
-
- if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
- futQ.offer(exchFut);
-
- if (log.isDebugEnabled())
- log.debug("Added exchange future to exchange worker: " + exchFut);
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- long timeout = cctx.gridConfig().getNetworkTimeout();
-
- int cnt = 0;
-
- while (!isCancelled()) {
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- cnt++;
-
- try {
- boolean preloadFinished = true;
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
-
- if (!preloadFinished)
- break;
- }
-
- // If not first preloading and no more topology events present.
- if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
- timeout = cctx.gridConfig().getNetworkTimeout();
-
- // After workers line up and before preloading starts we initialize all futures.
- if (log.isDebugEnabled()) {
- Collection<IgniteInternalFuture> unfinished = new HashSet<>();
-
- for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
- if (!fut.isDone())
- unfinished.add(fut);
- }
-
- log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
- }
-
- // Take next exchange future.
- exchFut = poll(futQ, timeout, this);
-
- if (exchFut == null)
- continue; // Main while loop.
-
- busy = true;
-
- Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
- boolean dummyReassign = exchFut.dummyReassign();
- boolean forcePreload = exchFut.forcePreload();
-
- try {
- if (isCancelled())
- break;
-
- if (!exchFut.dummy() && !exchFut.forcePreload()) {
- lastInitializedFut = exchFut;
-
- exchFut.init();
-
- int dumpedObjects = 0;
-
- while (true) {
- try {
- exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- U.warn(log, "Failed to wait for partition map exchange [" +
- "topVer=" + exchFut.topologyVersion() +
- ", node=" + cctx.localNodeId() + "]. " +
- "Dumping pending objects that might be the cause: ");
-
- if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
- try {
- dumpDebugInfo(exchFut.topologyVersion());
- }
- catch (Exception e) {
- U.error(log, "Failed to dump debug information: " + e, e);
- }
-
- if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
- U.dumpThreads(log);
-
- dumpedObjects++;
- }
- }
- }
-
-
- if (log.isDebugEnabled())
- log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
- this + ']');
-
- if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
- lastRefresh.compareAndSet(-1, U.currentTimeMillis());
-
- boolean changed = false;
-
- // Just pick first worker to do this, so we don't
- // invoke topology callback more than once for the
- // same event.
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
-
- changed |= cacheCtx.topology().afterExchange(exchFut);
- }
-
- if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
- refreshPartitions();
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Got dummy exchange (will reassign)");
-
- if (!dummyReassign) {
- timeout = 0; // Force refresh.
-
- continue;
- }
- }
-
- if (!exchFut.skipPreload()) {
- assignsMap = new HashMap<>();
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- long delay = cacheCtx.config().getRebalanceDelay();
-
- GridDhtPreloaderAssignments assigns = null;
-
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if (delay == 0 || forcePreload)
- assigns = cacheCtx.preloader().assign(exchFut);
-
- assignsMap.put(cacheCtx.cacheId(), assigns);
- }
- }
- }
- finally {
- // Must flip busy flag before assignments are given to demand workers.
- busy = false;
- }
-
- if (assignsMap != null) {
- int size = assignsMap.size();
-
- NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
-
- for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
- int cacheId = e.getKey();
-
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- int order = cacheCtx.config().getRebalanceOrder();
-
- if (orderMap.get(order) == null)
- orderMap.put(order, new ArrayList<Integer>(size));
-
- orderMap.get(order).add(cacheId);
- }
-
- Runnable r = null;
-
- List<String> rebList = new LinkedList<>();
-
- boolean assignsCancelled = false;
-
- for (Integer order : orderMap.descendingKeySet()) {
- for (Integer cacheId : orderMap.get(order)) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
-
- if (assigns != null)
- assignsCancelled |= assigns.cancelled();
-
- // Cancels previous rebalance future (in case it's not done yet).
- // Sends previous rebalance stopped event (if necessary).
- // Creates new rebalance future.
- // Sends current rebalance started event (if necessary).
- // Finishes cache sync future (on empty assignments).
- Runnable cur = cacheCtx.preloader().addAssignments(assigns,
- forcePreload,
- cnt,
- r,
- exchFut.forcedRebalanceFuture());
-
- if (cur != null) {
- rebList.add(U.maskName(cacheCtx.name()));
-
- r = cur;
- }
- }
- }
-
- if (assignsCancelled) { // Pending exchange.
- U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- else if (r != null) {
- Collections.reverse(rebList);
-
- U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
-
- if (futQ.isEmpty()) {
- U.log(log, "Rebalancing started " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-
- r.run(); // Starts rebalancing routine.
- }
- else
- U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- else
- U.log(log, "Skipping rebalancing (nothing scheduled) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteClientDisconnectedCheckedException ignored) {
- return;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to wait for completion of partition map exchange " +
- "(preloading will not start): " + exchFut, e);
- }
- }
- }
- }
-
- /**
* Partition resend timeout object.
*/
private class ResendTimeoutObject implements GridTimeoutObject {
[6/6] ignite git commit: Merge branch 'ignite-4834' into
ignite-4565-ddl
Posted by vo...@apache.org.
Merge branch 'ignite-4834' into ignite-4565-ddl
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4538526d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4538526d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4538526d
Branch: refs/heads/ignite-4565-ddl
Commit: 4538526d87d7b3e8c991470635b5911dba4cfa74
Parents: 70a1191 deeee8c
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:50:43 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:50:43 2017 +0300
----------------------------------------------------------------------
.../cache/CachePartitionExchangeWorkerTask.java | 29 ++++
.../GridCachePartitionExchangeManager.java | 173 +++++++++++++------
.../processors/cache/GridCacheProcessor.java | 19 ++
.../GridDhtPartitionsExchangeFuture.java | 8 +-
4 files changed, 177 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/4538526d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
[5/6] ignite git commit: Done.
Posted by vo...@apache.org.
Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/deeee8cb
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/deeee8cb
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/deeee8cb
Branch: refs/heads/ignite-4565-ddl
Commit: deeee8cb5b05137303dbb06ba4d0180426f4dd43
Parents: 19381da
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 17:02:15 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 17:02:15 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 25 ++++++++++++++------
.../processors/cache/GridCacheProcessor.java | 10 ++++++++
2 files changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 1ce8cfe..b4604e8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -54,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -221,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
- if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+ if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
@@ -256,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
- else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
- CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+ else if (customMsg instanceof CacheAffinityChangeMessage) {
+ CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
@@ -266,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
- else
- exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ else {
+ exchangeFuture(msg.exchangeId(), null, null, null, null)
+ .onAffinityChangeMessage(evt.eventNode(), msg);
+ }
+ }
+ else {
+ // Process event as custom discovery task if needed.
+ CachePartitionExchangeWorkerTask task =
+ cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+ if (task != null)
+ exchWorker.addCustomTask(task);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/deeee8cb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 459cf3a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,16 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Create exchange worker task for custom discovery message.
+ *
+ * @param msg Custom discovery message.
+ * @return Task or {@code null} if message doesn't require any special processing.
+ */
+ public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+ return null;
+ }
+
+ /**
* Process custom exchange task.
*
* @param task Task.
[4/6] ignite git commit: Done.
Posted by vo...@apache.org.
Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19381da3
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19381da3
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19381da3
Branch: refs/heads/ignite-4565-ddl
Commit: 19381da3961cbd2fee4f36c2b08638d507b0741c
Parents: e2f382b
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:41:35 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:41:35 2017 +0300
----------------------------------------------------------------------
.../cache/GridCachePartitionExchangeManager.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/19381da3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 26bc27d..1ce8cfe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -687,6 +687,17 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Add custom task.
+ *
+ * @param task Task.
+ */
+ public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ assert !task.isExchange();
+
+ exchWorker.addCustomTask(task);
+ }
+
+ /**
* @param evt Discovery event.
* @return Affinity topology version.
*/
@@ -818,8 +829,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Nullable GridCacheVersion lastVer,
boolean compress) {
GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
- lastVer,
- exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+ lastVer,
+ exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
boolean useOldApi = false;
[2/6] ignite git commit: WIP.
Posted by vo...@apache.org.
WIP.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92524a44
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92524a44
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92524a44
Branch: refs/heads/ignite-4565-ddl
Commit: 92524a44eb8ccfb43901557f23368fc325e46c59
Parents: 23f67a5
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:04:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:04:37 2017 +0300
----------------------------------------------------------------------
.../cache/CachePartitionExchangeWorker.java | 355 -------------------
.../cache/CachePartitionExchangeWorkerTask.java | 29 ++
.../GridCachePartitionExchangeManager.java | 335 ++++++++++++++++-
.../GridDhtPartitionsExchangeFuture.java | 8 +-
4 files changed, 357 insertions(+), 370 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
deleted file mode 100644
index 98a9cc0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorker.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.IgniteSystemProperties;
-import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
-import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
-import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
-
-/**
- * Exchange future thread. All exchanges happen only by one thread and next
- * exchange will not start until previous one completes.
- */
-public class CachePartitionExchangeWorker<K, V> extends GridWorker {
- /** Cache context. */
- private final GridCacheSharedContext<K, V> cctx;
-
- /** Exchange manager. */
- private final GridCachePartitionExchangeManager<K, V> exchMgr;
-
- /** Future queue. */
- private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
- new LinkedBlockingDeque<>();
-
- /** Busy flag used as performance optimization to stop current preloading. */
- private volatile boolean busy;
-
- /**
- * Constructor.
- *
- * @param exchMgr Exchange manager.
- * @param log Logger.
- */
- public CachePartitionExchangeWorker(GridCachePartitionExchangeManager<K, V> exchMgr, IgniteLogger log) {
- super(exchMgr.context().igniteInstanceName(), "partition-exchanger", log);
-
- this.cctx = exchMgr.context();
-
- this.exchMgr = exchMgr;
- }
-
- /**
- * Add first exchange future.
- *
- * @param fut Future.
- */
- public void addFirstFuture(GridDhtPartitionsExchangeFuture fut) {
- futQ.addFirst(fut);
- }
-
- /**
- * @param exchFut Exchange future.
- */
- void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
- assert exchFut != null;
-
- if (!exchFut.dummy() || (exchangeQueueIsEmpty() && !busy))
- futQ.offer(exchFut);
-
- if (log.isDebugEnabled())
- log.debug("Added exchange future to exchange worker: " + exchFut);
- }
-
- /**
- * Dump debug info.
- */
- public void dumpFuturesDebugInfo() {
- U.warn(log, "Pending exchange futures:");
-
- for (GridDhtPartitionsExchangeFuture fut : futQ)
- U.warn(log, ">>> " + fut);
- }
-
- /**
- * @return {@code True} iif exchange queue is empty.
- */
- public boolean exchangeQueueIsEmpty() {
- return futQ.isEmpty();
- }
-
- /** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- long timeout = cctx.gridConfig().getNetworkTimeout();
-
- int cnt = 0;
-
- while (!isCancelled()) {
- GridDhtPartitionsExchangeFuture exchFut = null;
-
- cnt++;
-
- try {
- boolean preloadFinished = true;
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
-
- if (!preloadFinished)
- break;
- }
-
- // If not first preloading and no more topology events present.
- if (!cctx.kernalContext().clientNode() && exchangeQueueIsEmpty() && preloadFinished)
- timeout = cctx.gridConfig().getNetworkTimeout();
-
- // After workers line up and before preloading starts we initialize all futures.
- if (log.isDebugEnabled()) {
- Collection<IgniteInternalFuture> unfinished = new HashSet<>();
-
- for (GridDhtPartitionsExchangeFuture fut : exchMgr.exchangeFutures()) {
- if (!fut.isDone())
- unfinished.add(fut);
- }
-
- log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
- }
-
- // Take next exchange future.
- if (isCancelled())
- Thread.currentThread().interrupt();
-
- exchFut = futQ.poll(timeout, MILLISECONDS);
-
- if (exchFut == null)
- continue; // Main while loop.
-
- busy = true;
-
- Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
-
- boolean dummyReassign = exchFut.dummyReassign();
- boolean forcePreload = exchFut.forcePreload();
-
- try {
- if (isCancelled())
- break;
-
- if (!exchFut.dummy() && !exchFut.forcePreload()) {
- exchMgr.lastTopologyFuture(exchFut);
-
- exchFut.init();
-
- int dumpedObjects = 0;
-
- while (true) {
- try {
- exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
-
- break;
- }
- catch (IgniteFutureTimeoutCheckedException ignored) {
- U.warn(log, "Failed to wait for partition map exchange [" +
- "topVer=" + exchFut.topologyVersion() +
- ", node=" + cctx.localNodeId() + "]. " +
- "Dumping pending objects that might be the cause: ");
-
- if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
- try {
- exchMgr.dumpDebugInfo(exchFut.topologyVersion());
- }
- catch (Exception e) {
- U.error(log, "Failed to dump debug information: " + e, e);
- }
-
- if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
- U.dumpThreads(log);
-
- dumpedObjects++;
- }
- }
- }
-
-
- if (log.isDebugEnabled())
- log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
- this + ']');
-
- boolean changed = false;
-
- // Just pick first worker to do this, so we don't
- // invoke topology callback more than once for the
- // same event.
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
-
- changed |= cacheCtx.topology().afterExchange(exchFut);
- }
-
- if (!cctx.kernalContext().clientNode() && changed && exchangeQueueIsEmpty())
- exchMgr.refreshPartitions();
- }
- else {
- if (log.isDebugEnabled())
- log.debug("Got dummy exchange (will reassign)");
-
- if (!dummyReassign) {
- timeout = 0; // Force refresh.
-
- continue;
- }
- }
-
- if (!exchFut.skipPreload()) {
- assignsMap = new HashMap<>();
-
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- long delay = cacheCtx.config().getRebalanceDelay();
-
- GridDhtPreloaderAssignments assigns = null;
-
- // Don't delay for dummy reassigns to avoid infinite recursion.
- if (delay == 0 || forcePreload)
- assigns = cacheCtx.preloader().assign(exchFut);
-
- assignsMap.put(cacheCtx.cacheId(), assigns);
- }
- }
- }
- finally {
- // Must flip busy flag before assignments are given to demand workers.
- busy = false;
- }
-
- if (assignsMap != null) {
- int size = assignsMap.size();
-
- NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
-
- for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
- int cacheId = e.getKey();
-
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- int order = cacheCtx.config().getRebalanceOrder();
-
- if (orderMap.get(order) == null)
- orderMap.put(order, new ArrayList<Integer>(size));
-
- orderMap.get(order).add(cacheId);
- }
-
- Runnable r = null;
-
- List<String> rebList = new LinkedList<>();
-
- boolean assignsCancelled = false;
-
- for (Integer order : orderMap.descendingKeySet()) {
- for (Integer cacheId : orderMap.get(order)) {
- GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
- GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
-
- if (assigns != null)
- assignsCancelled |= assigns.cancelled();
-
- // Cancels previous rebalance future (in case it's not done yet).
- // Sends previous rebalance stopped event (if necessary).
- // Creates new rebalance future.
- // Sends current rebalance started event (if necessary).
- // Finishes cache sync future (on empty assignments).
- Runnable cur = cacheCtx.preloader().addAssignments(assigns,
- forcePreload,
- cnt,
- r,
- exchFut.forcedRebalanceFuture());
-
- if (cur != null) {
- rebList.add(U.maskName(cacheCtx.name()));
-
- r = cur;
- }
- }
- }
-
- if (assignsCancelled) { // Pending exchange.
- U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- else if (r != null) {
- Collections.reverse(rebList);
-
- U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
-
- if (exchangeQueueIsEmpty()) {
- U.log(log, "Rebalancing started " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
-
- r.run(); // Starts rebalancing routine.
- }
- else
- U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- else
- U.log(log, "Skipping rebalancing (nothing scheduled) " +
- "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
- ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
- }
- }
- catch (IgniteInterruptedCheckedException e) {
- throw e;
- }
- catch (IgniteClientDisconnectedCheckedException ignored) {
- return;
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to wait for completion of partition map exchange " +
- "(preloading will not start): " + exchFut, e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+ /**
+ * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which
+ * must be executed from within exchange thread.
+ */
+ boolean isExchange();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index f9222bc..444b530 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -21,15 +21,24 @@ import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleRequest;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -80,6 +90,7 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteUuid;
@@ -87,6 +98,7 @@ import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getLong;
@@ -114,9 +126,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/** */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
+ /** Last partition refresh. */
+ private final AtomicLong lastRefresh = new AtomicLong(-1);
+
/** */
@GridToStringInclude
- private CachePartitionExchangeWorker exchWorker;
+ private ExchangeWorker exchWorker;
/** */
@GridToStringExclude
@@ -282,7 +297,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Override protected void start0() throws IgniteCheckedException {
super.start0();
- exchWorker = new CachePartitionExchangeWorker<>(this, log);
+ exchWorker = new ExchangeWorker();
cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
EVT_DISCOVERY_CUSTOM_EVT);
@@ -582,13 +597,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param lastInitializedFut Last completed topology future.
- */
- public void lastTopologyFuture(GridDhtPartitionsExchangeFuture lastInitializedFut) {
- this.lastInitializedFut = lastInitializedFut;
- }
-
- /**
* @param ver Topology version.
* @return Future or {@code null} is future is already completed.
*/
@@ -676,7 +684,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return {@code True} if pending future queue is empty.
*/
public boolean hasPendingExchange() {
- return !exchWorker.exchangeQueueIsEmpty();
+ return exchWorker.hasPendingExchange();
}
/**
@@ -731,7 +739,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
/**
* Partition refresh callback.
*/
- public void refreshPartitions() {
+ private void refreshPartitions() {
ClusterNode oldest = cctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
if (oldest == null) {
@@ -811,8 +819,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
@Nullable GridCacheVersion lastVer,
boolean compress) {
GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId,
- lastVer,
- exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
+ lastVer,
+ exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE);
boolean useOldApi = false;
@@ -1337,7 +1345,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.warn(log, "Last exchange future: " + lastInitializedFut);
- exchWorker.dumpFuturesDebugInfo();
+ exchWorker.dumpExchangeDebugInfo();
if (!readyFuts.isEmpty()) {
U.warn(log, "Pending affinity ready futures:");
@@ -1554,6 +1562,305 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Exchange future thread. All exchanges happen only by one thread and next
+ * exchange will not start until previous one completes.
+ */
+ private class ExchangeWorker extends GridWorker {
+ /** Future queue. */
+ private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+ new LinkedBlockingDeque<>();
+
+ /** Busy flag used as performance optimization to stop current preloading. */
+ private volatile boolean busy;
+
+ /**
+ * Constructor.
+ */
+ private ExchangeWorker() {
+ super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
+ }
+
+ /**
+ * Add first exchange future.
+ *
+ * @param exchFut Exchange future.
+ */
+ void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ futQ.addFirst(exchFut);
+ }
+
+ /**
+ * @param exchFut Exchange future.
+ */
+ void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ assert exchFut != null;
+
+ if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
+ futQ.offer(exchFut);
+
+ if (log.isDebugEnabled())
+ log.debug("Added exchange future to exchange worker: " + exchFut);
+ }
+
+ /**
+ * @return Whether pending exchange future exists.
+ */
+ boolean hasPendingExchange() {
+ return !futQ.isEmpty();
+ }
+
+ /**
+ * Dump debug info.
+ */
+ void dumpExchangeDebugInfo() {
+ U.warn(log, "Pending exchange futures:");
+
+ for (GridDhtPartitionsExchangeFuture fut : futQ)
+ U.warn(log, ">>> " + fut);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ long timeout = cctx.gridConfig().getNetworkTimeout();
+
+ int cnt = 0;
+
+ while (!isCancelled()) {
+ GridDhtPartitionsExchangeFuture exchFut = null;
+
+ cnt++;
+
+ try {
+ boolean preloadFinished = true;
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ preloadFinished &= cacheCtx.preloader() != null && cacheCtx.preloader().syncFuture().isDone();
+
+ if (!preloadFinished)
+ break;
+ }
+
+ // If not first preloading and no more topology events present.
+ if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished)
+ timeout = cctx.gridConfig().getNetworkTimeout();
+
+ // After workers line up and before preloading starts we initialize all futures.
+ if (log.isDebugEnabled()) {
+ Collection<IgniteInternalFuture> unfinished = new HashSet<>();
+
+ for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) {
+ if (!fut.isDone())
+ unfinished.add(fut);
+ }
+
+ log.debug("Before waiting for exchange futures [futs" + unfinished + ", worker=" + this + ']');
+ }
+
+ // Take next exchange future.
+ if (isCancelled())
+ Thread.currentThread().interrupt();
+
+ exchFut = futQ.poll(timeout, MILLISECONDS);
+
+ if (exchFut == null)
+ continue; // Main while loop.
+
+ busy = true;
+
+ Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
+
+ boolean dummyReassign = exchFut.dummyReassign();
+ boolean forcePreload = exchFut.forcePreload();
+
+ try {
+ if (isCancelled())
+ break;
+
+ if (!exchFut.dummy() && !exchFut.forcePreload()) {
+ lastInitializedFut = exchFut;
+
+ exchFut.init();
+
+ int dumpedObjects = 0;
+
+ while (true) {
+ try {
+ exchFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS);
+
+ break;
+ }
+ catch (IgniteFutureTimeoutCheckedException ignored) {
+ U.warn(log, "Failed to wait for partition map exchange [" +
+ "topVer=" + exchFut.topologyVersion() +
+ ", node=" + cctx.localNodeId() + "]. " +
+ "Dumping pending objects that might be the cause: ");
+
+ if (dumpedObjects < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) {
+ try {
+ dumpDebugInfo(exchFut.topologyVersion());
+ }
+ catch (Exception e) {
+ U.error(log, "Failed to dump debug information: " + e, e);
+ }
+
+ if (IgniteSystemProperties.getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
+ U.dumpThreads(log);
+
+ dumpedObjects++;
+ }
+ }
+ }
+
+
+ if (log.isDebugEnabled())
+ log.debug("After waiting for exchange future [exchFut=" + exchFut + ", worker=" +
+ this + ']');
+
+ if (exchFut.exchangeId().nodeId().equals(cctx.localNodeId()))
+ lastRefresh.compareAndSet(-1, U.currentTimeMillis());
+
+ boolean changed = false;
+
+ // Just pick first worker to do this, so we don't
+ // invoke topology callback more than once for the
+ // same event.
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
+
+ changed |= cacheCtx.topology().afterExchange(exchFut);
+ }
+
+ if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
+ refreshPartitions();
+ }
+ else {
+ if (log.isDebugEnabled())
+ log.debug("Got dummy exchange (will reassign)");
+
+ if (!dummyReassign) {
+ timeout = 0; // Force refresh.
+
+ continue;
+ }
+ }
+
+ if (!exchFut.skipPreload()) {
+ assignsMap = new HashMap<>();
+
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ long delay = cacheCtx.config().getRebalanceDelay();
+
+ GridDhtPreloaderAssignments assigns = null;
+
+ // Don't delay for dummy reassigns to avoid infinite recursion.
+ if (delay == 0 || forcePreload)
+ assigns = cacheCtx.preloader().assign(exchFut);
+
+ assignsMap.put(cacheCtx.cacheId(), assigns);
+ }
+ }
+ }
+ finally {
+ // Must flip busy flag before assignments are given to demand workers.
+ busy = false;
+ }
+
+ if (assignsMap != null) {
+ int size = assignsMap.size();
+
+ NavigableMap<Integer, List<Integer>> orderMap = new TreeMap<>();
+
+ for (Map.Entry<Integer, GridDhtPreloaderAssignments> e : assignsMap.entrySet()) {
+ int cacheId = e.getKey();
+
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ int order = cacheCtx.config().getRebalanceOrder();
+
+ if (orderMap.get(order) == null)
+ orderMap.put(order, new ArrayList<Integer>(size));
+
+ orderMap.get(order).add(cacheId);
+ }
+
+ Runnable r = null;
+
+ List<String> rebList = new LinkedList<>();
+
+ boolean assignsCancelled = false;
+
+ for (Integer order : orderMap.descendingKeySet()) {
+ for (Integer cacheId : orderMap.get(order)) {
+ GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
+
+ GridDhtPreloaderAssignments assigns = assignsMap.get(cacheId);
+
+ if (assigns != null)
+ assignsCancelled |= assigns.cancelled();
+
+ // Cancels previous rebalance future (in case it's not done yet).
+ // Sends previous rebalance stopped event (if necessary).
+ // Creates new rebalance future.
+ // Sends current rebalance started event (if necessary).
+ // Finishes cache sync future (on empty assignments).
+ Runnable cur = cacheCtx.preloader().addAssignments(assigns,
+ forcePreload,
+ cnt,
+ r,
+ exchFut.forcedRebalanceFuture());
+
+ if (cur != null) {
+ rebList.add(U.maskName(cacheCtx.name()));
+
+ r = cur;
+ }
+ }
+ }
+
+ if (assignsCancelled) { // Pending exchange.
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ else if (r != null) {
+ Collections.reverse(rebList);
+
+ U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
+
+ if (!hasPendingExchange()) {
+ U.log(log, "Rebalancing started " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+
+ r.run(); // Starts rebalancing routine.
+ }
+ else
+ U.log(log, "Skipping rebalancing (obsolete exchange ID) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ else
+ U.log(log, "Skipping rebalancing (nothing scheduled) " +
+ "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
+ ", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
+ }
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ throw e;
+ }
+ catch (IgniteClientDisconnectedCheckedException ignored) {
+ return;
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to wait for completion of partition map exchange " +
+ "(preloading will not start): " + exchFut, e);
+ }
+ }
+ }
+ }
+
+ /**
* Partition resend timeout object.
*/
private class ResendTimeoutObject implements GridTimeoutObject {
http://git-wip-us.apache.org/repos/asf/ignite/blob/92524a44/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
* Future for exchanging partition maps.
*/
public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
- implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+ implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
/** */
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/** {@inheritDoc} */
+ @Override public boolean isExchange() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
}
[3/6] ignite git commit: Done.
Posted by vo...@apache.org.
Done.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e2f382be
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e2f382be
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e2f382be
Branch: refs/heads/ignite-4565-ddl
Commit: e2f382bed0dc0a1c906fdb06f35c9e93988e9fc8
Parents: 92524a4
Author: devozerov <vo...@gridgain.com>
Authored: Fri Mar 17 16:38:37 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Mar 17 16:38:37 2017 +0300
----------------------------------------------------------------------
.../GridCachePartitionExchangeManager.java | 77 ++++++++++++++++----
.../processors/cache/GridCacheProcessor.java | 9 +++
2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 444b530..26bc27d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -369,7 +368,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- exchWorker.addFirstFuture(fut);
+ exchWorker.addFirstExchangeFuture(fut);
if (!cctx.kernalContext().clientNode()) {
for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -704,7 +703,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
public void forceDummyExchange(boolean reassign,
GridDhtPartitionsExchangeFuture exchFut) {
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
}
@@ -716,7 +715,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
return fut;
@@ -1192,7 +1191,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
if (fut.onAdded()) {
- exchWorker.addFuture(fut);
+ exchWorker.addExchangeFuture(fut);
return true;
}
@@ -1567,7 +1566,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private class ExchangeWorker extends GridWorker {
/** Future queue. */
- private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+ private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
new LinkedBlockingDeque<>();
/** Busy flag used as performance optimization to stop current preloading. */
@@ -1585,14 +1584,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*
* @param exchFut Exchange future.
*/
- void addFirstFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
futQ.addFirst(exchFut);
}
/**
* @param exchFut Exchange future.
*/
- void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut != null;
if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
@@ -1603,10 +1602,44 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
+ * Add custom exchange task.
+ *
+ * @param task Task.
+ */
+ void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ assert task != null;
+
+ assert !task.isExchange();
+
+ futQ.offer(task);
+ }
+
+ /**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ void processCustomTask(CachePartitionExchangeWorkerTask task) {
+ try {
+ cctx.cache().processCustomExchangeTask(task);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to process custom exchange task: " + task, e);
+ }
+ }
+
+ /**
* @return Whether pending exchange future exists.
*/
boolean hasPendingExchange() {
- return !futQ.isEmpty();
+ if (!futQ.isEmpty()) {
+ for (CachePartitionExchangeWorkerTask task : futQ) {
+ if (task.isExchange())
+ return true;
+ }
+ }
+
+ return false;
}
/**
@@ -1615,8 +1648,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
void dumpExchangeDebugInfo() {
U.warn(log, "Pending exchange futures:");
- for (GridDhtPartitionsExchangeFuture fut : futQ)
- U.warn(log, ">>> " + fut);
+ for (CachePartitionExchangeWorkerTask task: futQ) {
+ if (task.isExchange())
+ U.warn(log, ">>> " + task);
+ }
}
/** {@inheritDoc} */
@@ -1626,7 +1661,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int cnt = 0;
while (!isCancelled()) {
- GridDhtPartitionsExchangeFuture exchFut = null;
+ CachePartitionExchangeWorkerTask task = null;
cnt++;
@@ -1660,10 +1695,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (isCancelled())
Thread.currentThread().interrupt();
- exchFut = futQ.poll(timeout, MILLISECONDS);
+ task = futQ.poll(timeout, MILLISECONDS);
+
+ if (task == null)
+ continue;
+
+ if (!task.isExchange()) {
+ processCustomTask(task);
+
+ continue;
+ }
+
+ assert task instanceof GridDhtPartitionsExchangeFuture;
- if (exchFut == null)
- continue; // Main while loop.
+ GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
busy = true;
@@ -1854,7 +1899,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
- "(preloading will not start): " + exchFut, e);
+ "(preloading will not start): " + task, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e2f382be/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..459cf3a 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,15 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+ // No-op.
+ }
+
+ /**
* @param c Ignite configuration.
* @param cc Configuration to validate.
* @param cacheType Cache type.