You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ak...@apache.org on 2015/06/09 07:20:50 UTC
[08/29] incubator-ignite git commit: # ignite-883
# ignite-883
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bfc78ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bfc78ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bfc78ea
Branch: refs/heads/ignite-992
Commit: 6bfc78ea6a4559b4ee9059893e0a7f9d4195ad3c
Parents: a3eb572
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 10:04:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 10:04:18 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 18 ++++++++--
.../datastructures/DataStructuresProcessor.java | 36 +++++++++++++++++---
2 files changed, 48 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3fc50f..27d2dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private Timer updateNtfTimer;
+ /** */
+ @GridToStringExclude
+ private GridTimeoutProcessor.CancelableTask starveTask;
+
+ /** */
+ @GridToStringExclude
+ private GridTimeoutProcessor.CancelableTask metricsLogTask;
+
/** Indicate error on grid stop. */
@GridToStringExclude
private boolean errOnStop;
@@ -859,7 +867,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (starveCheck) {
final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
- ctx.timeout().schedule(new Runnable() {
+ starveTask = ctx.timeout().schedule(new Runnable() {
/** Last completed task count. */
private long lastCompletedCnt;
@@ -886,7 +894,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
long metricsLogFreq = cfg.getMetricsLogFrequency();
if (metricsLogFreq > 0) {
- ctx.timeout().schedule(new Runnable() {
+ metricsLogTask = ctx.timeout().schedule(new Runnable() {
private final DecimalFormat dblFmt = new DecimalFormat("#.##");
@Override public void run() {
@@ -1700,6 +1708,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (updateNtfTimer != null)
updateNtfTimer.cancel();
+ if (starveTask != null)
+ starveTask.close();
+
+ if (metricsLogTask != null)
+ metricsLogTask.close();
+
boolean interrupted = false;
while (true) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 27f6a29..2138639 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.datastructures;
import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
@@ -145,7 +146,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
dsCacheCtx = atomicsCache.context();
qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
- null,
+ new DataStructuresEntryFilter(),
dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
false);
}
@@ -1051,10 +1052,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
/**
*
*/
- private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+ static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
/** {@inheritDoc} */
- @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
- throws CacheEntryListenerException {
+ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+ if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
+ return evt.getValue() instanceof GridCacheCountDownLatchValue;
+ else {
+ assert evt.getEventType() == EventType.REMOVED : evt;
+
+ return true;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(DataStructuresEntryFilter.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class DataStructuresEntryListener implements
+ CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+ /** {@inheritDoc} */
+ @Override public void onUpdated(
+ Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+ throws CacheEntryListenerException
+ {
for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
GridCacheInternal val0 = evt.getValue();