You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/08/15 03:49:17 UTC
[30/35] incubator-ignite git commit: Fixed threads cleanup in
continuous processor
Fixed threads cleanup in continuous processor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ba3abcec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ba3abcec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ba3abcec
Branch: refs/heads/ignite-264
Commit: ba3abceca10a1745253a3c28e7a6fe6f5833d266
Parents: 6697b0c
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Thu Aug 13 15:50:39 2015 -0700
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Thu Aug 13 15:50:39 2015 -0700
----------------------------------------------------------------------
.../continuous/GridContinuousProcessor.java | 16 +++++++++++-----
.../GridCacheContinuousQueryAbstractSelfTest.java | 2 +-
2 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba3abcec/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index 5f1c4bb..a360e35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.managers.eventstorage.*;
import org.apache.ignite.internal.processors.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.timeout.*;
-import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
@@ -72,7 +71,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
private final ConcurrentMap<UUID, StopFuture> stopFuts = new ConcurrentHashMap8<>();
/** Threads started by this processor. */
- private final Collection<IgniteThread> threads = new GridConcurrentHashSet<>();
+ private final Map<UUID, IgniteThread> bufCheckThreads = new ConcurrentHashMap8<>();
/** */
private final ConcurrentMap<IgniteUuid, SyncMessageAckFuture> syncMsgFuts = new ConcurrentHashMap8<>();
@@ -311,8 +310,10 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.io().removeMessageListener(TOPIC_CONTINUOUS);
- U.interrupt(threads);
- U.joinThreads(threads, log);
+ for (IgniteThread thread : bufCheckThreads.values()) {
+ U.interrupt(thread);
+ U.join(thread);
+ }
if (log.isDebugEnabled())
log.debug("Continuous processor stopped.");
@@ -915,7 +916,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
- threads.add(checker);
+ bufCheckThreads.put(routineId, checker);
checker.start();
}
@@ -947,6 +948,11 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
ctx.io().removeMessageListener(hnd.orderedTopic());
hnd.unregister(routineId, ctx);
+
+ IgniteThread checker = bufCheckThreads.remove(routineId);
+
+ if (checker != null)
+ checker.interrupt();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ba3abcec/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 4681071..7b628b4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -29,7 +29,6 @@ import org.apache.ignite.internal.processors.datastructures.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.optimized.*;
import org.apache.ignite.spi.discovery.tcp.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
@@ -177,6 +176,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "rmtInfos")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "startFuts")).size());
assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "stopFuts")).size());
+ assertEquals(String.valueOf(i), 0, ((Map)U.field(proc, "bufCheckThreads")).size());
CacheContinuousQueryManager mgr = grid(i).context().cache().internalCache().context().continuousQueries();