You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/01 17:10:37 UTC
[2/2] incubator-ignite git commit: # IGNITE-883 Avoid creation a lot
of threads.
# IGNITE-883 Avoid creation a lot of threads.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/f9f89cbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/f9f89cbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/f9f89cbe
Branch: refs/heads/ignite-883
Commit: f9f89cbe3401eb1656209a8deba3a42c13cc00cf
Parents: 04081ab
Author: sevdokimov <se...@gridgain.com>
Authored: Mon Jun 1 18:10:28 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Mon Jun 1 18:10:28 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/IgniteKernal.java | 28 +----
.../org/apache/ignite/internal/IgnitionEx.java | 63 +++++++----
.../discovery/GridDiscoveryManager.java | 28 ++---
.../timeout/GridTimeoutProcessor.java | 107 ++++++++++++++++++-
.../tcp/TcpClientDiscoverySpiSelfTest.java | 10 ++
5 files changed, 169 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 8a7dc70..e3fc50f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -167,14 +167,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private Timer updateNtfTimer;
- /** */
- @GridToStringExclude
- private Timer starveTimer;
-
- /** */
- @GridToStringExclude
- private Timer metricsLogTimer;
-
/** Indicate error on grid stop. */
@GridToStringExclude
private boolean errOnStop;
@@ -867,13 +859,11 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (starveCheck) {
final long interval = F.isEmpty(intervalStr) ? PERIODIC_STARVATION_CHECK_FREQ : Long.parseLong(intervalStr);
- starveTimer = new Timer("ignite-starvation-checker");
-
- starveTimer.scheduleAtFixedRate(new GridTimerTask() {
+ ctx.timeout().schedule(new Runnable() {
/** Last completed task count. */
private long lastCompletedCnt;
- @Override protected void safeRun() {
+ @Override public void run() {
if (!(execSvc instanceof ThreadPoolExecutor))
return;
@@ -896,13 +886,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
long metricsLogFreq = cfg.getMetricsLogFrequency();
if (metricsLogFreq > 0) {
- metricsLogTimer = new Timer("ignite-metrics-logger");
-
- metricsLogTimer.scheduleAtFixedRate(new GridTimerTask() {
- /** */
+ ctx.timeout().schedule(new Runnable() {
private final DecimalFormat dblFmt = new DecimalFormat("#.##");
- @Override protected void safeRun() {
+ @Override public void run() {
if (log.isInfoEnabled()) {
ClusterMetrics m = cluster().localNode().metrics();
@@ -1713,13 +1700,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
if (updateNtfTimer != null)
updateNtfTimer.cancel();
- if (starveTimer != null)
- starveTimer.cancel();
-
- // Cancel metrics log timer.
- if (metricsLogTimer != null)
- metricsLogTimer.cancel();
-
boolean interrupted = false;
while (true) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 6e4efb5..7df95c7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1447,27 +1447,48 @@ public class IgnitionEx {
ensureMultiInstanceSupport(myCfg.getSwapSpaceSpi());
}
- execSvc = new IgniteThreadPoolExecutor(
- "pub-" + cfg.getGridName(),
- cfg.getPublicThreadPoolSize(),
- cfg.getPublicThreadPoolSize(),
- DFLT_PUBLIC_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
-
- // Note that since we use 'LinkedBlockingQueue', number of
- // maximum threads has no effect.
- sysExecSvc = new IgniteThreadPoolExecutor(
- "sys-" + cfg.getGridName(),
- cfg.getSystemThreadPoolSize(),
- cfg.getSystemThreadPoolSize(),
- DFLT_SYSTEM_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
-
- // Pre-start all threads as they are guaranteed to be needed.
- ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+ boolean isClientMode = Boolean.TRUE.equals(myCfg.isClientMode());
+
+ if (isClientMode) {
+ execSvc = new IgniteThreadPoolExecutor(
+ "pub-" + cfg.getGridName(),
+ 0,
+ cfg.getPublicThreadPoolSize(),
+ 2000,
+ new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+ // Note that since we use 'LinkedBlockingQueue', number of
+ // maximum threads has no effect.
+ sysExecSvc = new IgniteThreadPoolExecutor(
+ "sys-" + cfg.getGridName(),
+ 1,
+ cfg.getSystemThreadPoolSize(),
+ 5000,
+ new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+ }
+ else {
+ execSvc = new IgniteThreadPoolExecutor(
+ "pub-" + cfg.getGridName(),
+ cfg.getPublicThreadPoolSize(),
+ cfg.getPublicThreadPoolSize(),
+ DFLT_PUBLIC_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_PUBLIC_THREADPOOL_QUEUE_CAP));
+
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor) execSvc).prestartAllCoreThreads();
+
+ // Note that since we use 'LinkedBlockingQueue', number of
+ // maximum threads has no effect.
+ sysExecSvc = new IgniteThreadPoolExecutor(
+ "sys-" + cfg.getGridName(),
+ cfg.getSystemThreadPoolSize(),
+ cfg.getSystemThreadPoolSize(),
+ DFLT_SYSTEM_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>(DFLT_SYSTEM_THREADPOOL_QUEUE_CAP));
+
+ // Pre-start all threads as they are guaranteed to be needed.
+ ((ThreadPoolExecutor) sysExecSvc).prestartAllCoreThreads();
+ }
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4ef602e..9b8280e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.jobmetrics.*;
import org.apache.ignite.internal.processors.security.*;
+import org.apache.ignite.internal.processors.timeout.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
@@ -165,7 +166,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
private final GridLocalMetrics metrics = createMetrics();
/** Metrics update worker. */
- private final MetricsUpdater metricsUpdater = new MetricsUpdater();
+ private GridTimeoutProcessor.CancelableTask metricsUpdateTask;
/** Custom event listener. */
private ConcurrentMap<Class<?>, List<CustomEventListener<DiscoveryCustomMessage>>> customEvtLsnrs =
@@ -325,7 +326,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
checkSegmentOnStart();
}
- new IgniteThread(metricsUpdater).start();
+ metricsUpdateTask = ctx.timeout().schedule(new MetricsUpdater(), METRICS_UPDATE_FREQ, METRICS_UPDATE_FREQ);
spi.setMetricsProvider(createMetricsProvider());
@@ -987,11 +988,11 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
getSpi().setListener(null);
// Stop discovery worker and metrics updater.
+ U.closeQuiet(metricsUpdateTask);
+
U.cancel(discoWrk);
- U.cancel(metricsUpdater);
U.join(discoWrk, log);
- U.join(metricsUpdater, log);
// Stop SPI itself.
stopSpi();
@@ -1879,28 +1880,17 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/**
*
*/
- private class MetricsUpdater extends GridWorker {
+ private class MetricsUpdater implements Runnable {
/** */
private long prevGcTime = -1;
/** */
private long prevCpuTime = -1;
- /**
- *
- */
- private MetricsUpdater() {
- super(ctx.gridName(), "metrics-updater", GridDiscoveryManager.this.log);
- }
-
/** {@inheritDoc} */
- @Override protected void body() throws IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- U.sleep(METRICS_UPDATE_FREQ);
-
- gcCpuLoad = getGcCpuLoad();
- cpuLoad = getCpuLoad();
- }
+ @Override public void run() {
+ gcCpuLoad = getGcCpuLoad();
+ cpuLoad = getCpuLoad();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index 81ff72b..e9b7717 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -24,8 +24,10 @@ import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
import org.apache.ignite.thread.*;
+import java.io.*;
import java.util.*;
/**
@@ -40,10 +42,12 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
new GridConcurrentSkipListSet<>(new Comparator<GridTimeoutObject>() {
/** {@inheritDoc} */
@Override public int compare(GridTimeoutObject o1, GridTimeoutObject o2) {
- long time1 = o1.endTime();
- long time2 = o2.endTime();
+ int res = Long.compare(o1.endTime(), o2.endTime());
- return time1 < time2 ? -1 : time1 > time2 ? 1 : o1.timeoutId().compareTo(o2.timeoutId());
+ if (res != 0)
+ return res;
+
+ return o1.timeoutId().compareTo(o2.timeoutId());
}
});
@@ -98,6 +102,26 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
}
/**
+ * Schedule the specified timer task for execution at the specified
+ * time with the specified period, in milliseconds.
+ *
+ * @param task Task to execute.
+ * @param delay Delay to first execution in milliseconds.
+ * @param period Period for execution in milliseconds or -1.
+ * @return Cancelable to cancel task.
+ */
+ public CancelableTask schedule(Runnable task, long delay, long period) {
+ assert delay >= 0;
+ assert period > 0 || period == -1;
+
+ CancelableTask obj = new CancelableTask(task, U.currentTimeMillis() + delay, period);
+
+ addTimeoutObject(obj);
+
+ return obj;
+ }
+
+ /**
* @param timeoutObj Timeout object.
*/
public void removeTimeoutObject(GridTimeoutObject timeoutObj) {
@@ -173,4 +197,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
X.println(">>> Timeout processor memory stats [grid=" + ctx.gridName() + ']');
X.println(">>> timeoutObjsSize: " + timeoutObjs.size());
}
+
+ /**
+ *
+ */
+ public class CancelableTask implements GridTimeoutObject, Closeable {
+ /** */
+ private final IgniteUuid id = new IgniteUuid();
+
+ /** */
+ private long endTime;
+
+ /** */
+ private final long period;
+
+ /** */
+ private volatile boolean cancel;
+
+ /** */
+ private final Runnable task;
+
+ /**
+ * @param firstTime First time.
+ * @param period Period.
+ * @param task Task to execute.
+ */
+ CancelableTask(Runnable task, long firstTime, long period) {
+ this.task = task;
+ endTime = firstTime;
+ this.period = period;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public synchronized void onTimeout() {
+ if (cancel)
+ return;
+
+ long startTime = U.currentTimeMillis();
+
+ try {
+ task.run();
+ }
+ finally {
+ long executionTime = U.currentTimeMillis() - startTime;
+
+ if (executionTime > 10) {
+ U.warn(log, "Timer task take a lot of time, tasks submitted to GridTimeoutProcessor must work " +
+ "quickly [executionTime=" + executionTime + ']');
+ }
+
+ if (!cancel && period > 0) {
+ endTime = U.currentTimeMillis() + period;
+
+ addTimeoutObject(this);
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ cancel = true;
+
+ synchronized (this) {
+ // Just waiting for task execution end to make sure that task will not be executed anymore.
+ removeTimeoutObject(this);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/f9f89cbe/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
index f7be340..eed076b 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiSelfTest.java
@@ -845,6 +845,16 @@ public class TcpClientDiscoverySpiSelfTest extends GridCommonAbstractTest {
/**
* @throws Exception If any error occurs.
*/
+ public void testThreads() throws Exception {
+ startServerNodes(1);
+ startClientNodes(1);
+
+ System.out.println("End");
+ }
+
+ /**
+ * @throws Exception If any error occurs.
+ */
public void testGridStartTime() throws Exception {
startServerNodes(2);