You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/06/08 12:39:49 UTC

[03/21] incubator-ignite git commit: # IGNITE-883 Create and use GridTimerProcessor.schedule()

# IGNITE-883 Create and use GridTimerProcessor.schedule()


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/997d65e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/997d65e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/997d65e2

Branch: refs/heads/ignite-sprint-5
Commit: 997d65e2aeb5d1c7a62a95120876238b0de6d3d4
Parents: bfae889
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Jun 1 18:10:28 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Jun 1 19:50:46 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/IgniteKernal.java    |  28 +----
 .../discovery/GridDiscoveryManager.java         |  28 ++---
 .../timeout/GridTimeoutProcessor.java           | 107 ++++++++++++++++++-
 3 files changed, 117 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..e3fc50f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,14 +167,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
     @GridToStringExclude
     private Timer updateNtfTimer;
 
-    /** */
-    @GridToStringExclude
-    private Timer starveTimer;
-
-    /** */
-    @GridToStringExclude
-    private Timer metricsLogTimer;
-
     /** Indicate error on grid stop. */
     @GridToStringExclude
     private boolean errOnStop;
@@ -867,13 +859,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         if (starveCheck) {
             final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
 
-            starveTimer = new Timer("ignite-starvation-checker");
-
-            starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+            ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
                 private long lastCompletedCnt;
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (!(execSvc instanceof ThreadPoolExecutor))
                         return;
 
@@ -896,13 +886,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         long metricsLogFreq = cfg.getMetricsLogFrequency();
 
         if (metricsLogFreq > 0) {
-            metricsLogTimer = new Timer("ignite-metrics-logger");
-
-            metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
-                /** */
+            ctx.timeout().schedule(new Runnable() {
                 private final DecimalFormat dblFmt = new DecimalFormat("#.##");
 
-                @Override protected void safeRun() {
+                @Override public void run() {
                     if (log.isInfoEnabled()) {
                         ClusterMetrics m = cluster().localNode().metrics();
 
@@ -1713,13 +1700,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             if (updateNtfTimer != null)
                 updateNtfTimer.cancel();
 
-            if (starveTimer != null)
-                starveTimer.cancel();
-
-            // Cancel metrics log timer.
-            if (metricsLogTimer != null)
-                metricsLogTimer.cancel();
-
             boolean interrupted = false;
 
             while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4ef602e..9b8280e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
 import org.apache.ignite.internal.processors.cache.*;
 import org.apache.ignite.internal.processors.jobmetrics.*;
 import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
 import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.lang.*;
@@ -165,7 +166,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     private final GridLocalMetrics metrics = createMetrics();
 
     /** Metrics update worker. */
-    private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+    private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
 
     /** Custom event listener. */
     private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
@@ -325,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
             checkSegmentOnStart();
         }
 
-        new IgniteThread(metricsUpdater).start();
+        metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
 
         spi.setMetricsProvider(createMetricsProvider());
 
@@ -987,11 +988,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
         getSpi().setListener(null);
 
         // Stop discovery worker and metrics updater.
+        U.closeQuiet(metricsUpdateTask);
+
         U.cancel(discoWrk);
-        U.cancel(metricsUpdater);
 
         U.join(discoWrk, log);
-        U.join(metricsUpdater, log);
 
         // Stop SPI itself.
         stopSpi();
@@ -1879,28 +1880,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /**
      *
      */
-    private class MetricsUpdater extends GridWorker {
+    private class MetricsUpdater implements Runnable {
         /** */
         private long prevGcTime = -1;
 
         /** */
         private long prevCpuTime = -1;
 
-        /**
-         *
-         */
-        private MetricsUpdater() {
-            super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
-        }
-
         /** {@inheritDoc} */
-        @Override protected void body() throws IgniteInterruptedCheckedException {
-            while (!isCancelled()) {
-                U.sleep(METRICS_UPDATE_FREQ);
-
-                gcCpuLoad = getGcCpuLoad();
-                cpuLoad = getCpuLoad();
-            }
+        @Override public void run() {
+            gcCpuLoad = getGcCpuLoad();
+            cpuLoad = getCpuLoad();
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/997d65e2/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e9b7717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -24,8 +24,10 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.thread.*;
 
+import java.io.*;
 import java.util.*;
 
 /**
@@ -40,10 +42,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
             /** {@inheritDoc} */
             @Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
-                long time1 = o1.endTime();
-                long time2 = o2.endTime();
+                int res = Long.compare(o1.endTime(), o2.endTime());
 
-                return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+                if (res != 0)
+                    return res;
+
+                return o1.timeoutId().compareTo(o2.timeoutId());
             }
         });
 
@@ -98,6 +102,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
     }
 
     /**
+     * Schedule the specified timer task for execution at the specified
+     * time with the specified period, in milliseconds.
+     *
+     * @param task Task to execute.
+     * @param delay Delay to first execution in milliseconds.
+     * @param period Period for execution in milliseconds or -1.
+     * @return Cancelable to cancel task.
+     */
+    public CancelableTask schedule(Runnable task, long delay, long period) {
+        assert delay >= 0;
+        assert period > 0 || period == -1;
+
+        CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+        addTimeoutObject(obj);
+
+        return obj;
+    }
+
+    /**
      * @param timeoutObj Timeout object.
      */
     public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +197,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
         X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']');
         X.println(">>>   timeoutObjsSize: " + timeoutObjs.size());
     }
+
+    /**
+     *
+     */
+    public class CancelableTask implements GridTimeoutObject, Closeable {
+        /** */
+        private final IgniteUuid id = new IgniteUuid();
+
+        /** */
+        private long endTime;
+
+        /** */
+        private final long period;
+
+        /** */
+        private volatile boolean cancel;
+
+        /** */
+        private final Runnable task;
+
+        /**
+         * @param firstTime First time.
+         * @param period Period.
+         * @param task Task to execute.
+         */
+        CancelableTask(Runnable task, long firstTime, long period) {
+            this.task = task;
+            endTime = firstTime;
+            this.period = period;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized void onTimeout() {
+            if (cancel)
+                return;
+
+            long startTime = U.currentTimeMillis();
+
+            try {
+                task.run();
+            }
+            finally {
+                long executionTime = U.currentTimeMillis() - startTime;
+
+                if (executionTime > 10) {
+                    U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
+                        "quickly [executionTime=" + executionTime + ']');
+                }
+
+                if (!cancel && period > 0) {
+                    endTime = U.currentTimeMillis() + period;
+
+                    addTimeoutObject(this);
+                }
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() {
+            cancel = true;
+
+            synchronized (this) {
+                // Just waiting for task execution end to make sure that task will not be executed anymore.
+                removeTimeoutObject(this);
+            }
+        }
+    }
 }