You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/26 16:49:25 UTC
[3/7] incubator-ignite git commit: IGNITE-709 Fix tests.
IGNITE-709 Fix tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e575164b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e575164b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e575164b
Branch: refs/heads/ignite-23
Commit: e575164bbfb8f4f5b7c56c3470b234fff52cf8a3
Parents: d59a712
Author: sevdokimov <se...@gridgain.com>
Authored: Tue May 26 14:41:18 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Tue May 26 14:41:18 2015 +0300
----------------------------------------------------------------------
.../internal/GridEventConsumeHandler.java | 106 ++++++++++---------
.../continuous/GridContinuousProcessor.java | 37 ++++++-
.../apache/ignite/internal/GridSelfTest.java | 4 +-
3 files changed, 93 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index bb8366a..f33fa39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -137,80 +137,86 @@ class GridEventConsumeHandler implements GridContinuousHandler {
private boolean notificationInProgress;
@Override public void onEvent(Event evt) {
- synchronized (notificationQueue) {
- notificationQueue.add(new T3<>(nodeId, routineId, evt));
+ if (filter != null && !filter.apply(evt))
+ return;
- if (!notificationInProgress) {
- ctx.getSystemExecutorService().submit(new Runnable() {
- @Override public void run() {
- while (true) {
- T3<UUID, UUID, Event> t3;
-
- synchronized (notificationQueue) {
- t3 = notificationQueue.poll();
+ if (loc) {
+ if (!cb.apply(nodeId, evt))
+ ctx.continuous().stopRoutine(routineId);
+ }
+ else {
+ if (ctx.discovery().node(nodeId) == null)
+ return;
- if (t3 == null) {
- notificationInProgress = false;
+ synchronized (notificationQueue) {
+ notificationQueue.add(new T3<>(nodeId, routineId, evt));
- return;
- }
- }
+ if (!notificationInProgress) {
+ ctx.getSystemExecutorService().submit(new Runnable() {
+ @Override public void run() {
+ if (!ctx.continuous().lockStopping())
+ return;
try {
- Event evt = t3.get3();
+ while (true) {
+ T3<UUID, UUID, Event> t3;
- if (filter != null && !filter.apply(evt))
- continue;
+ synchronized (notificationQueue) {
+ t3 = notificationQueue.poll();
- if (loc) {
- if (!cb.apply(nodeId, evt)) {
- ctx.continuous().stopRoutine(routineId);
+ if (t3 == null) {
+ notificationInProgress = false;
- return;
+ return;
+ }
}
- continue;
- }
+ try {
+ Event evt = t3.get3();
- ClusterNode node = ctx.discovery().node(t3.get1());
+ EventWrapper wrapper = new EventWrapper(evt);
- if (node == null)
- continue;
+ if (evt instanceof CacheEvent) {
+ String cacheName = ((CacheEvent)evt).cacheName();
- EventWrapper wrapper = new EventWrapper(evt);
+ ClusterNode node = ctx.discovery().node(t3.get1());
- if (evt instanceof CacheEvent) {
- String cacheName = ((CacheEvent)evt).cacheName();
+ if (node == null)
+ continue;
- if (ctx.config().isPeerClassLoadingEnabled()
- && ctx.discovery().cacheNode(node, cacheName)) {
- wrapper.p2pMarshal(ctx.config().getMarshaller());
+ if (ctx.config().isPeerClassLoadingEnabled()
+ && ctx.discovery().cacheNode(node, cacheName)) {
+ wrapper.p2pMarshal(ctx.config().getMarshaller());
- wrapper.cacheName = cacheName;
+ wrapper.cacheName = cacheName;
- GridCacheDeploymentManager depMgr =
- ctx.cache().internalCache(cacheName).context().deploy();
+ GridCacheDeploymentManager depMgr = ctx.cache()
+ .internalCache(cacheName).context().deploy();
- depMgr.prepare(wrapper);
+ depMgr.prepare(wrapper);
+ }
+ }
+
+ ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false,
+ false);
+ }
+ catch (ClusterTopologyCheckedException ignored) {
+ // No-op.
+ }
+ catch (Throwable e) {
+ U.error(ctx.log(GridEventConsumeHandler.class),
+ "Failed to send event notification to node: " + nodeId, e);
}
}
-
- ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null,
- false, false);
- }
- catch (ClusterTopologyCheckedException
- | IgniteInterruptedCheckedException ignored) {
- // No-op.
}
- catch (Throwable e) {
- U.error(ctx.log(GridEventConsumeHandler.class),
- "Failed to send event notification to node: " + nodeId, e);
+ finally {
+ ctx.continuous().unlockStopping();
}
}
- }
- });
+ });
- notificationInProgress = true;
+ notificationInProgress = true;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 71a2a66..67b32a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -93,7 +93,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private int retryCnt = 3;
/** */
- private volatile boolean processorStopped;
+ private final ReentrantReadWriteLock processorStopLock = new ReentrantReadWriteLock();
+
+ /** */
+ private boolean processorStopped;
/**
* @param ctx Kernal context.
@@ -259,9 +262,39 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
log.debug("Continuous processor started.");
}
+ /**
+ * @return {@code true} if lock successful, {@code false} if processor already stopped.
+ */
+ @SuppressWarnings("LockAcquiredButNotSafelyReleased")
+ public boolean lockStopping() {
+ processorStopLock.readLock().lock();
+
+ if (processorStopped) {
+ processorStopLock.readLock().unlock();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ *
+ */
+ public void unlockStopping() {
+ processorStopLock.readLock().unlock();
+ }
+
/** {@inheritDoc} */
@Override public void onKernalStop(boolean cancel) {
- processorStopped = true;
+ processorStopLock.writeLock().lock();
+
+ try {
+ processorStopped = true;
+ }
+ finally {
+ processorStopLock.writeLock().unlock();
+ }
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e575164b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
index b4dce6c..7f5ee54 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridSelfTest.java
@@ -118,8 +118,8 @@ public class GridSelfTest extends GridProjectionAbstractTest {
g.message().remoteListen(null, new MessagingListenActor<String>() {
@Override protected void receive(UUID nodeId, String rcvMsg) throws Throwable {
- assert locNodeId.equals(nodeId);
- assert msg.equals(rcvMsg);
+ assertEquals(locNodeId, nodeId);
+ assertEquals(msg, rcvMsg);
stop(rcvMsg);
}