You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/26 16:49:25 UTC

[3/7] incubator-ignite git commit: IGNITE-709 Fix tests.

IGNITE-709 Fix tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e575164b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e575164b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e575164b

Branch: refs/heads/ignite-23
Commit: e575164bbfb8f4f5b7c56c3470b234fff52cf8a3
Parents: d59a712
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 26 14:41:18 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 26 14:41:18 2015 +0300

----------------------------------------------------------------------
 .../internal/GridEventConsumeHandler.java       | 106 ++++++++++---------
 .../continuous/GridContinuousProcessor.java     |  37 ++++++-
 .../apache/ignite/internal/GridSelfTest.java    |   4 +-
 3 files changed, 93 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index bb8366a..f33fa39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -137,80 +137,86 @@ class GridEventConsumeHandler implements GridContinuousHandler {
             private boolean notificationInProgress;
 
             @Override public void onEvent(Event evt) {
-                synchronized (notificationQueue) {
-                    notificationQueue.add(new T3<>(nodeId, routineId, evt));
+                if (filter != null && !filter.apply(evt))
+                    return;
 
-                    if (!notificationInProgress) {
-                        ctx.getSystemExecutorService().submit(new Runnable() {
-                            @Override public void run() {
-                                while (true) {
-                                    T3<UUID, UUID, Event> t3;
-
-                                    synchronized (notificationQueue) {
-                                        t3 = notificationQueue.poll();
+                if (loc) {
+                    if (!cb.apply(nodeId, evt))
+                        ctx.continuous().stopRoutine(routineId);
+                }
+                else {
+                    if (ctx.discovery().node(nodeId) == null)
+                        return;
 
-                                        if (t3 == null) {
-                                            notificationInProgress = false;
+                    synchronized (notificationQueue) {
+                        notificationQueue.add(new T3<>(nodeId, routineId, evt));
 
-                                            return;
-                                        }
-                                    }
+                        if (!notificationInProgress) {
+                            ctx.getSystemExecutorService().submit(new Runnable() {
+                                @Override public void run() {
+                                    if (!ctx.continuous().lockStopping())
+                                        return;
 
                                     try {
-                                        Event evt = t3.get3();
+                                        while (true) {
+                                            T3<UUID, UUID, Event> t3;
 
-                                        if (filter != null && !filter.apply(evt))
-                                            continue;
+                                            synchronized (notificationQueue) {
+                                                t3 = notificationQueue.poll();
 
-                                        if (loc) {
-                                            if (!cb.apply(nodeId, evt)) {
-                                                ctx.continuous().stopRoutine(routineId);
+                                                if (t3 == null) {
+                                                    notificationInProgress = false;
 
-                                                return;
+                                                    return;
+                                                }
                                             }
 
-                                            continue;
-                                        }
+                                            try {
+                                                Event evt = t3.get3();
 
-                                        ClusterNode node = ctx.discovery().node(t3.get1());
+                                                EventWrapper wrapper = new EventWrapper(evt);
 
-                                        if (node == null)
-                                            continue;
+                                                if (evt instanceof CacheEvent) {
+                                                    String cacheName = ((CacheEvent)evt).cacheName();
 
-                                        EventWrapper wrapper = new EventWrapper(evt);
+                                                    ClusterNode node = ctx.discovery().node(t3.get1());
 
-                                        if (evt instanceof CacheEvent) {
-                                            String cacheName = ((CacheEvent)evt).cacheName();
+                                                    if (node == null)
+                                                        continue;
 
-                                            if (ctx.config().isPeerClassLoadingEnabled()
-                                                && ctx.discovery().cacheNode(node, cacheName)) {
-                                                wrapper.p2pMarshal(ctx.config().getMarshaller());
+                                                    if (ctx.config().isPeerClassLoadingEnabled()
+                                                        && ctx.discovery().cacheNode(node, cacheName)) {
+                                                        wrapper.p2pMarshal(ctx.config().getMarshaller());
 
-                                                wrapper.cacheName = cacheName;
+                                                        wrapper.cacheName = cacheName;
 
-                                                GridCacheDeploymentManager depMgr =
-                                                    ctx.cache().internalCache(cacheName).context().deploy();
+                                                        GridCacheDeploymentManager depMgr = ctx.cache()
+                                                            .internalCache(cacheName).context().deploy();
 
-                                                depMgr.prepare(wrapper);
+                                                        depMgr.prepare(wrapper);
+                                                    }
+                                                }
+
+                                                ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
+                                                    false);
+                                            }
+                                            catch (ClusterTopologyCheckedException ignored) {
+                                                // No-op.
+                                            }
+                                            catch (Throwable e) {
+                                                U.error(ctx.log(GridEventConsumeHandler.class),
+                                                    "Failed to send event notification to node: " + nodeId, e);
                                             }
                                         }
-
-                                        ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
-                                            false, false);
-                                    }
-                                    catch (ClusterTopologyCheckedException
-                                        | IgniteInterruptedCheckedException ignored) {
-                                        // No-op.
                                     }
-                                    catch (Throwable e) {
-                                        U.error(ctx.log(GridEventConsumeHandler.class),
-                                            "Failed to send event notification to node: " + nodeId, e);
+                                    finally {
+                                        ctx.continuous().unlockStopping();
                                     }
                                 }
-                            }
-                        });
+                            });
 
-                        notificationInProgress = true;
+                            notificationInProgress = true;
+                        }
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 71a2a66..67b32a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -93,7 +93,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     private int retryCnt = 3;
 
     /** */
-    private volatile boolean processorStopped;
+    private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock();
+
+    /** */
+    private boolean processorStopped;
 
     /**
      * @param ctx Kernal context.
@@ -259,9 +262,39 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             log.debug("Continuous processor started.");
     }
 
+    /**
+     * @return {@code true} if lock successful, {@code false} if processor already stopped.
+     */
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+    public boolean lockStopping() {
+        processorStopLock.readLock().lock();
+
+        if (processorStopped) {
+            processorStopLock.readLock().unlock();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     *
+     */
+    public void unlockStopping() {
+        processorStopLock.readLock().unlock();
+    }
+
     /** {@inheritDoc} */
     @Override public void onKernalStop(boolean cancel) {
-        processorStopped = true;
+        processorStopLock.writeLock().lock();
+
+        try {
+            processorStopped = true;
+        }
+        finally {
+            processorStopLock.writeLock().unlock();
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
index b4dce6c..7f5ee54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
@@ -118,8 +118,8 @@ public class GridSelfTest extends GridProjectionAbstractTest {
 
         g.message().remoteListen(null, new MessagingListenActor<String>() {
             @Override protected void receive(UUID nodeId, String rcvMsg) throws Throwable {
-                assert locNodeId.equals(nodeId);
-                assert msg.equals(rcvMsg);
+                assertEquals(locNodeId, nodeId);
+                assertEquals(msg, rcvMsg);
 
                 stop(rcvMsg);
             }