You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/25 20:50:42 UTC

[09/20] incubator-ignite git commit: IGNITE-709 Bug fix avoid hang GridContinuousProcessor

IGNITE-709 Bug fix avoid hang GridContinuousProcessor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a154d2c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a154d2c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a154d2c9

Branch: refs/heads/ignite-23
Commit: a154d2c9582c378b04121f535c3fa59b0499dbaa
Parents: 9480e8d
Author: sevdokimov <se...@jetbrains.com>
Authored: Mon May 25 09:42:59 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Mon May 25 09:42:59 2015 +0300

----------------------------------------------------------------------
 .../continuous/GridContinuousProcessor.java     | 44 +++++++++++++++++++-
 1 file changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a154d2c9/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index d67a45a..d5c2488 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -92,6 +92,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     /** Number of retries using to send messages. */
     private int retryCnt = 3;
 
+    private ExecutorService sendNotificationThreadPool;
+
     /**
      * @param ctx Kernal context.
      */
@@ -109,6 +111,9 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         marsh = ctx.config().getMarshaller();
 
+        sendNotificationThreadPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
+            new LinkedBlockingQueue<Runnable>(), new IgniteThreadFactory(ctx.gridName(), "notification-sender"));
+
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @SuppressWarnings({"fallthrough", "TooBroadScope"})
             @Override public void onEvent(Event evt) {
@@ -268,6 +273,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         if (log.isDebugEnabled())
             log.debug("Continuous processor stopped.");
+
+        sendNotificationThreadPool.shutdownNow();
     }
 
     /** {@inheritDoc} */
@@ -578,7 +585,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
                 Collection<Object> toSnd = info.add(obj);
 
                 if (toSnd != null)
-                    sendNotification(nodeId, routineId, null, toSnd, orderedTopic, msg);
+                    sendNotificationAsync(nodeId, routineId, null, toSnd, orderedTopic, msg);
             }
         }
     }
@@ -609,6 +616,41 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * @param nodeId Node ID.
+     * @param routineId Routine ID.
+     * @param futId Future ID.
+     * @param toSnd Notification object to send.
+     * @param orderedTopic Topic for ordered notifications.
+     *      If {@code null}, non-ordered message will be sent.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void sendNotificationAsync(final UUID nodeId,
+        final UUID routineId,
+        @Nullable final IgniteUuid futId,
+        final Collection<Object> toSnd,
+        @Nullable final Object orderedTopic,
+        final boolean msg) {
+        assert nodeId != null;
+        assert routineId != null;
+        assert toSnd != null;
+        assert !toSnd.isEmpty();
+
+        sendNotificationThreadPool.execute(new Runnable() {
+            @Override public void run() {
+                try {
+                    sendWithRetries(nodeId,
+                        new GridContinuousMessage(MSG_EVT_NOTIFICATION, routineId, futId, toSnd, msg),
+                        orderedTopic);
+                }
+                catch (IgniteCheckedException e) {
+                    U.error(log, "Failed to send event notification to node: " + nodeId, e);
+                }
+            }
+        });
+
+    }
+
+    /**
      * @param node Sender.
      * @param req Start request.
      */