You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:42 UTC
[09/20] incubator-ignite git commit: IGNITE-709 Bug fix avoid hang
GridContinuousProcessor
IGNITE-709 Bug fix avoid hang GridContinuousProcessor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a154d2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a154d2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a154d2c9
Branch: refs/heads/ignite-23
Commit: a154d2c9582c378b04121f535c3fa59b0499dbaa
Parents: 9480e8d
Author: sevdokimov <se...@jetbrains.com>
Authored: Mon May 25 09:42:59 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Mon May 25 09:42:59 2015 +0300
----------------------------------------------------------------------
.../continuous/GridContinuousProcessor.java | 44 +++++++++++++++++++-
1 file changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a154d2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d67a45a..d5c2488 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -92,6 +92,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/** Number of retries using to send messages. */
private int retryCnt = 3;
+ private ExecutorService sendNotificationThreadPool;
+
/**
* @param ctx Kernal context.
*/
@@ -109,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
marsh = ctx.config().getMarshaller();
+ sendNotificationThreadPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), new IgniteThreadFactory(ctx.gridName(), "notification-sender"));
+
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@SuppressWarnings({"fallthrough", "TooBroadScope"})
@Override public void onEvent(Event evt) {
@@ -268,6 +273,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
if (log.isDebugEnabled())
log.debug("Continuous processor stopped.");
+
+ sendNotificationThreadPool.shutdownNow();
}
/** {@inheritDoc} */
@@ -578,7 +585,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
Collection<Object> toSnd = info.add(obj);
if (toSnd != null)
- sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+ sendNotificationAsync(nodeId, routineId, null, toSnd, orderedTopic, msg);
}
}
}
@@ -609,6 +616,41 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ * @param nodeId Node ID.
+ * @param routineId Routine ID.
+ * @param futId Future ID.
+ * @param toSnd Notification object to send.
+ * @param orderedTopic Topic for ordered notifications.
+ * If {@code null}, non-ordered message will be sent.
+ * @throws IgniteCheckedException In case of error.
+ */
+ private void sendNotificationAsync(final UUID nodeId,
+ final UUID routineId,
+ @Nullable final IgniteUuid futId,
+ final Collection<Object> toSnd,
+ @Nullable final Object orderedTopic,
+ final boolean msg) {
+ assert nodeId != null;
+ assert routineId != null;
+ assert toSnd != null;
+ assert !toSnd.isEmpty();
+
+ sendNotificationThreadPool.execute(new Runnable() {
+ @Override public void run() {
+ try {
+ sendWithRetries(nodeId,
+ new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
+ orderedTopic);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send event notification to node: " + nodeId, e);
+ }
+ }
+ });
+
+ }
+
+ /**
* @param node Sender.
* @param req Start request.
*/