You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2017/03/23 07:15:46 UTC
[11/51] [abbrv] ignite git commit: IGNITE-4834: Added ability to
execute custom tasks from exchange thread.
IGNITE-4834: Added ability to execute custom tasks from exchange thread.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/61c845d6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/61c845d6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/61c845d6
Branch: refs/heads/ignite-4829
Commit: 61c845d66b82463bfad71c28093f2cbf54d99eb0
Parents: 9020d12
Author: devozerov <vo...@gridgain.com>
Authored: Mon Mar 20 10:16:36 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Mar 20 10:16:36 2017 +0300
----------------------------------------------------------------------
.../cache/CachePartitionExchangeWorkerTask.java | 29 ++++
.../GridCachePartitionExchangeManager.java | 171 +++++++++++++------
.../processors/cache/GridCacheProcessor.java | 19 +++
.../GridDhtPartitionsExchangeFuture.java | 8 +-
4 files changed, 176 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
new file mode 100644
index 0000000..80ef9f5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CachePartitionExchangeWorkerTask.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+/**
+ * Cache partition exchange worker task marker interface.
+ */
+public interface CachePartitionExchangeWorkerTask {
+ /**
+ * @return {@code True) if task denotes standard exchange task, {@code false} if this is a custom task which
+ * must be executed from within exchange thread.
+ */
+ boolean isExchange();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 3e72efb..f7edb08 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -222,10 +222,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache,null, null);
}
else {
- DiscoveryCustomEvent customEvt = (DiscoveryCustomEvent)evt;
+ DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)evt).customMessage();
- if (customEvt.customMessage() instanceof DynamicCacheChangeBatch) {
- DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customEvt.customMessage();
+ if (customMsg instanceof DynamicCacheChangeBatch) {
+ DynamicCacheChangeBatch batch = (DynamicCacheChangeBatch)customMsg;
Collection<DynamicCacheChangeRequest> valid = new ArrayList<>(batch.requests().size());
@@ -257,8 +257,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, valid, null);
}
}
- else if (customEvt.customMessage() instanceof CacheAffinityChangeMessage) {
- CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customEvt.customMessage();
+ else if (customMsg instanceof CacheAffinityChangeMessage) {
+ CacheAffinityChangeMessage msg = (CacheAffinityChangeMessage)customMsg;
if (msg.exchangeId() == null) {
if (msg.exchangeNeeded()) {
@@ -267,8 +267,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
exchFut = exchangeFuture(exchId, evt, cache, null, msg);
}
}
- else
- exchangeFuture(msg.exchangeId(), null, null, null, null).onAffinityChangeMessage(customEvt.eventNode(), msg);
+ else {
+ exchangeFuture(msg.exchangeId(), null, null, null, null)
+ .onAffinityChangeMessage(evt.eventNode(), msg);
+ }
+ }
+ else {
+ // Process event as custom discovery task if needed.
+ CachePartitionExchangeWorkerTask task =
+ cctx.cache().exchangeTaskForCustomDiscoveryMessage(customMsg);
+
+ if (task != null)
+ exchWorker.addCustomTask(task);
}
}
@@ -369,7 +379,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
if (reconnect)
reconnectExchangeFut = new GridFutureAdapter<>();
- exchWorker.futQ.addFirst(fut);
+ exchWorker.addFirstExchangeFuture(fut);
if (!cctx.kernalContext().clientNode()) {
for (int cnt = 0; cnt < cctx.gridConfig().getRebalanceThreadPoolSize(); cnt++) {
@@ -684,7 +694,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
* @return {@code True} if pending future queue is empty.
*/
public boolean hasPendingExchange() {
- return !exchWorker.futQ.isEmpty();
+ return exchWorker.hasPendingExchange();
+ }
+
+ /**
+ * Add custom task.
+ *
+ * @param task Task.
+ */
+ public void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ assert !task.isExchange();
+
+ exchWorker.addCustomTask(task);
}
/**
@@ -704,7 +725,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
public void forceDummyExchange(boolean reassign,
GridDhtPartitionsExchangeFuture exchFut) {
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId()));
}
@@ -716,7 +737,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) {
GridFutureAdapter<Boolean> fut = new GridFutureAdapter<>();
- exchWorker.addFuture(
+ exchWorker.addExchangeFuture(
new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut));
return fut;
@@ -1192,7 +1213,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private boolean addFuture(GridDhtPartitionsExchangeFuture fut) {
if (fut.onAdded()) {
- exchWorker.addFuture(fut);
+ exchWorker.addExchangeFuture(fut);
return true;
}
@@ -1345,10 +1366,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.warn(log, "Last exchange future: " + lastInitializedFut);
- U.warn(log, "Pending exchange futures:");
-
- for (GridDhtPartitionsExchangeFuture fut : exchWorker.futQ)
- U.warn(log, ">>> " + fut);
+ exchWorker.dumpExchangeDebugInfo();
if (!readyFuts.isEmpty()) {
U.warn(log, "Pending affinity ready futures:");
@@ -1547,28 +1565,6 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
/**
- * @param deque Deque to poll from.
- * @param time Time to wait.
- * @param w Worker.
- * @return Polled item.
- * @throws InterruptedException If interrupted.
- */
- @Nullable private <T> T poll(BlockingQueue<T> deque, long time, GridWorker w) throws InterruptedException {
- assert w != null;
-
- // There is currently a case where {@code interrupted}
- // flag on a thread gets flipped during stop which causes the pool to hang. This check
- // will always make sure that interrupted flag gets reset before going into wait conditions.
- // The true fix should actually make sure that interrupted flag does not get reset or that
- // interrupted exception gets propagated. Until we find a real fix, this method should
- // always work to make sure that there is no hanging during stop.
- if (w.isCancelled())
- Thread.currentThread().interrupt();
-
- return deque.poll(time, MILLISECONDS);
- }
-
- /**
* @param node Target node.
* @return {@code True} if can use compression for partition map messages.
*/
@@ -1592,32 +1588,94 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
*/
private class ExchangeWorker extends GridWorker {
/** Future queue. */
- private final LinkedBlockingDeque<GridDhtPartitionsExchangeFuture> futQ =
+ private final LinkedBlockingDeque<CachePartitionExchangeWorkerTask> futQ =
new LinkedBlockingDeque<>();
/** Busy flag used as performance optimization to stop current preloading. */
private volatile boolean busy;
/**
- *
+ * Constructor.
*/
private ExchangeWorker() {
super(cctx.igniteInstanceName(), "partition-exchanger", GridCachePartitionExchangeManager.this.log);
}
/**
+ * Add first exchange future.
+ *
+ * @param exchFut Exchange future.
+ */
+ void addFirstExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ futQ.addFirst(exchFut);
+ }
+
+ /**
* @param exchFut Exchange future.
*/
- void addFuture(GridDhtPartitionsExchangeFuture exchFut) {
+ void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut != null;
- if (!exchFut.dummy() || (futQ.isEmpty() && !busy))
+ if (!exchFut.dummy() || (!hasPendingExchange() && !busy))
futQ.offer(exchFut);
if (log.isDebugEnabled())
log.debug("Added exchange future to exchange worker: " + exchFut);
}
+ /**
+ * Add custom exchange task.
+ *
+ * @param task Task.
+ */
+ void addCustomTask(CachePartitionExchangeWorkerTask task) {
+ assert task != null;
+
+ assert !task.isExchange();
+
+ futQ.offer(task);
+ }
+
+ /**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ void processCustomTask(CachePartitionExchangeWorkerTask task) {
+ try {
+ cctx.cache().processCustomExchangeTask(task);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to process custom exchange task: " + task, e);
+ }
+ }
+
+ /**
+ * @return Whether pending exchange future exists.
+ */
+ boolean hasPendingExchange() {
+ if (!futQ.isEmpty()) {
+ for (CachePartitionExchangeWorkerTask task : futQ) {
+ if (task.isExchange())
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Dump debug info.
+ */
+ void dumpExchangeDebugInfo() {
+ U.warn(log, "Pending exchange futures:");
+
+ for (CachePartitionExchangeWorkerTask task: futQ) {
+ if (task.isExchange())
+ U.warn(log, ">>> " + task);
+ }
+ }
+
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
long timeout = cctx.gridConfig().getNetworkTimeout();
@@ -1625,7 +1683,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
int cnt = 0;
while (!isCancelled()) {
- GridDhtPartitionsExchangeFuture exchFut = null;
+ CachePartitionExchangeWorkerTask task = null;
cnt++;
@@ -1640,7 +1698,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
// If not first preloading and no more topology events present.
- if (!cctx.kernalContext().clientNode() && futQ.isEmpty() && preloadFinished)
+ if (!cctx.kernalContext().clientNode() && !hasPendingExchange() && preloadFinished)
timeout = cctx.gridConfig().getNetworkTimeout();
// After workers line up and before preloading starts we initialize all futures.
@@ -1656,11 +1714,24 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
// Take next exchange future.
- exchFut = poll(futQ, timeout, this);
+ if (isCancelled())
+ Thread.currentThread().interrupt();
- if (exchFut == null)
+ task = futQ.poll(timeout, MILLISECONDS);
+
+ if (task == null)
continue; // Main while loop.
+ if (!task.isExchange()) {
+ processCustomTask(task);
+
+ continue;
+ }
+
+ assert task instanceof GridDhtPartitionsExchangeFuture;
+
+ GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task;
+
busy = true;
Map<Integer, GridDhtPreloaderAssignments> assignsMap = null;
@@ -1727,7 +1798,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
changed |= cacheCtx.topology().afterExchange(exchFut);
}
- if (!cctx.kernalContext().clientNode() && changed && futQ.isEmpty())
+ if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange())
refreshPartitions();
}
else {
@@ -1824,7 +1895,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
U.log(log, "Rebalancing scheduled [order=" + rebList + "]");
- if (futQ.isEmpty()) {
+ if (!hasPendingExchange()) {
U.log(log, "Rebalancing started " +
"[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() +
", node=" + exchFut.discoveryEvent().eventNode().id() + ']');
@@ -1850,7 +1921,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for completion of partition map exchange " +
- "(preloading will not start): " + exchFut, e);
+ "(preloading will not start): " + task, e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index c7ac31a..a7d38a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -367,6 +367,25 @@ public class GridCacheProcessor extends GridProcessorAdapter {
}
/**
+ * Create exchange worker task for custom discovery message.
+ *
+ * @param msg Custom discovery message.
+ * @return Task or {@code null} if message doesn't require any special processing.
+ */
+ public CachePartitionExchangeWorkerTask exchangeTaskForCustomDiscoveryMessage(DiscoveryCustomMessage msg) {
+ return null;
+ }
+
+ /**
+ * Process custom exchange task.
+ *
+ * @param task Task.
+ */
+ public void processCustomExchangeTask(CachePartitionExchangeWorkerTask task) {
+ // No-op.
+ }
+
+ /**
* @param c Ignite configuration.
* @param cc Configuration to validate.
* @param cacheType Cache type.
http://git-wip-us.apache.org/repos/asf/ignite/blob/61c845d6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 46fb144..50937a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
+import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -84,7 +85,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYS
* Future for exchanging partition maps.
*/
public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion>
- implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture {
+ implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture, CachePartitionExchangeWorkerTask {
/** */
public static final int DUMP_PENDING_OBJECTS_THRESHOLD =
IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_DUMP_PENDING_OBJECTS_THRESHOLD, 10);
@@ -1677,6 +1678,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
}
/** {@inheritDoc} */
+ @Override public boolean isExchange() {
+ return true;
+ }
+
+ /** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
}