You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/09 08:47:48 UTC

[03/50] incubator-ignite git commit: # ignite-883

# ignite-883


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/6bfc78ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/6bfc78ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/6bfc78ea

Branch: refs/heads/ignite-484-1
Commit: 6bfc78ea6a4559b4ee9059893e0a7f9d4195ad3c
Parents: a3eb572
Author: sboikov <sb...@gridgain.com>
Authored: Thu Jun 4 10:04:18 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Thu Jun 4 10:04:18 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    | 18 ++++++++--
 .../datastructures/DataStructuresProcessor.java | 36 +++++++++++++++++---
 2 files changed, 48 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3fc50f..27d2dc7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,6 +167,14 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private Timer updateNtfTimer;
 
+    /** */
+    @GridToStringExclude
+    private GridTimeoutProcessor.CancelableTask starveTask;
+
+    /** */
+    @GridToStringExclude
+    private GridTimeoutProcessor.CancelableTask metricsLogTask;
+
     /** Indicate error on grid stop. */
     @GridToStringExclude
     private boolean errOnStop;
@@ -859,7 +867,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            ctx.timeout().schedule(new Runnable() {
+            starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
@@ -886,7 +894,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            ctx.timeout().schedule(new Runnable() {
+            metricsLogTask = ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
                 @Override public void run() {
@@ -1700,6 +1708,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
+            if (starveTask != null)
+                starveTask.close();
+
+            if (metricsLogTask != null)
+                metricsLogTask.close();
+
             boolean interrupted = false;
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6bfc78ea/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
index 27f6a29..2138639 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/DataStructuresProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.datastructures;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
 import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
@@ -145,7 +146,7 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
             dsCacheCtx = atomicsCache.context();
 
             qryId = dsCacheCtx.continuousQueries().executeInternalQuery(new DataStructuresEntryListener(),
-                null,
+                new DataStructuresEntryFilter(),
                 dsCacheCtx.isReplicated() && dsCacheCtx.affinityNode(),
                 false);
         }
@@ -1051,10 +1052,37 @@ public final class DataStructuresProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private class DataStructuresEntryListener implements CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+    static class DataStructuresEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
+        /** */
+        private static final long serialVersionUID = 0L;
+
         /** {@inheritDoc} */
-        @Override public void onUpdated(Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
-            throws CacheEntryListenerException {
+        @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
+            if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED)
+                return evt.getValue() instanceof GridCacheCountDownLatchValue;
+            else {
+                assert evt.getEventType() == EventType.REMOVED : evt;
+
+                return true;
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(DataStructuresEntryFilter.class, this);
+        }
+    }
+
+    /**
+     *
+     */
+    private class DataStructuresEntryListener implements
+        CacheEntryUpdatedListener<GridCacheInternalKey, GridCacheInternal> {
+        /** {@inheritDoc} */
+        @Override public void onUpdated(
+            Iterable<CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal>> evts)
+            throws CacheEntryListenerException
+        {
             for (CacheEntryEvent<? extends GridCacheInternalKey, ? extends GridCacheInternal> evt : evts) {
                 if (evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED) {
                     GridCacheInternal val0 = evt.getValue();