You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/11 15:30:28 UTC
ignite git commit: IGNITE-6892 OOM should be covered by failure
handling
Repository: ignite
Updated Branches:
refs/heads/master 32fc6c3c1 -> d1be9b855
IGNITE-6892 OOM should be covered by failure handling
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d1be9b85
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d1be9b85
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d1be9b85
Branch: refs/heads/master
Commit: d1be9b85507eb3358327e93b81031f92e660531b
Parents: 32fc6c3
Author: Aleksey Plekhanov <pl...@gmail.com>
Authored: Wed Apr 11 18:24:51 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Wed Apr 11 18:24:51 2018 +0300
----------------------------------------------------------------------
.../apache/ignite/IgniteSystemProperties.java | 8 +
.../org/apache/ignite/internal/IgnitionEx.java | 50 +++-
.../discovery/GridDiscoveryManager.java | 3 +
.../processors/cache/WalStateManager.java | 8 +-
.../continuous/GridContinuousProcessor.java | 3 +
.../datastreamer/DataStreamProcessor.java | 3 +
.../processors/failure/FailureProcessor.java | 11 +
.../internal/processors/job/GridJobWorker.java | 8 +-
.../service/GridServiceProcessor.java | 15 +-
.../thread/IgniteStripedThreadPoolExecutor.java | 8 +-
.../ignite/thread/IgniteThreadFactory.java | 30 ++-
.../ignite/thread/IgniteThreadPoolExecutor.java | 12 +-
.../ignite/thread/OomExceptionHandler.java | 44 ++++
.../ignite/failure/OomFailureHandlerTest.java | 255 +++++++++++++++++++
.../ignite/testsuites/IgniteBasicTestSuite.java | 2 +
15 files changed, 430 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 662338c..437f49f 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -863,6 +863,14 @@ public final class IgniteSystemProperties {
public static final String IGNITE_BPLUS_TREE_LOCK_RETRIES = "IGNITE_BPLUS_TREE_LOCK_RETRIES";
/**
+ * Amount of memory reserved in the heap at node start, which can be dropped to increase the chances of success when
+ * handling OutOfMemoryError.
+ *
+ * Default is {@code 64kb}.
+ */
+ public static final String IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE = "IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE";
+
+ /**
* The threshold of uneven distribution above which partition distribution will be logged.
*
* The default is '50', that means: warn about nodes with 50+% difference.
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 10a0752..b3c3ee8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.lang.reflect.Constructor;
import java.net.MalformedURLException;
@@ -88,6 +89,7 @@ import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -1764,6 +1766,13 @@ public class IgnitionEx {
validateThreadPoolSize(cfg.getPublicThreadPoolSize(), "public");
+ UncaughtExceptionHandler oomeHnd = new UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread t, Throwable e) {
+ if (grid != null && X.hasCause(e, OutOfMemoryError.class))
+ grid.context().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+ };
+
execSvc = new IgniteThreadPoolExecutor(
"pub",
cfg.getIgniteInstanceName(),
@@ -1771,7 +1780,8 @@ public class IgnitionEx {
cfg.getPublicThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.PUBLIC_POOL);
+ GridIoPolicy.PUBLIC_POOL,
+ oomeHnd);
execSvc.allowCoreThreadTimeOut(true);
@@ -1784,7 +1794,8 @@ public class IgnitionEx {
cfg.getServiceThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.SERVICE_POOL);
+ GridIoPolicy.SERVICE_POOL,
+ oomeHnd);
svcExecSvc.allowCoreThreadTimeOut(true);
@@ -1797,7 +1808,8 @@ public class IgnitionEx {
cfg.getSystemThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.SYSTEM_POOL);
+ GridIoPolicy.SYSTEM_POOL,
+ oomeHnd);
sysExecSvc.allowCoreThreadTimeOut(true);
@@ -1828,7 +1840,8 @@ public class IgnitionEx {
cfg.getManagementThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.MANAGEMENT_POOL);
+ GridIoPolicy.MANAGEMENT_POOL,
+ oomeHnd);
mgmtExecSvc.allowCoreThreadTimeOut(true);
@@ -1844,7 +1857,8 @@ public class IgnitionEx {
cfg.getPeerClassLoadingThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.P2P_POOL);
+ GridIoPolicy.P2P_POOL,
+ oomeHnd);
p2pExecSvc.allowCoreThreadTimeOut(true);
@@ -1879,7 +1893,8 @@ public class IgnitionEx {
callbackExecSvc = new IgniteStripedThreadPoolExecutor(
cfg.getAsyncCallbackPoolSize(),
cfg.getIgniteInstanceName(),
- "callback");
+ "callback",
+ oomeHnd);
if (myCfg.getConnectorConfiguration() != null) {
validateThreadPoolSize(myCfg.getConnectorConfiguration().getThreadPoolSize(), "connector");
@@ -1890,7 +1905,9 @@ public class IgnitionEx {
myCfg.getConnectorConfiguration().getThreadPoolSize(),
myCfg.getConnectorConfiguration().getThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>()
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.UNDEFINED,
+ oomeHnd
);
restExecSvc.allowCoreThreadTimeOut(true);
@@ -1905,7 +1922,8 @@ public class IgnitionEx {
myCfg.getUtilityCacheThreadPoolSize(),
myCfg.getUtilityCacheKeepAliveTime(),
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.UTILITY_CACHE_POOL);
+ GridIoPolicy.UTILITY_CACHE_POOL,
+ oomeHnd);
utilityCacheExecSvc.allowCoreThreadTimeOut(true);
@@ -1916,7 +1934,8 @@ public class IgnitionEx {
1,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.AFFINITY_POOL);
+ GridIoPolicy.AFFINITY_POOL,
+ oomeHnd);
affExecSvc.allowCoreThreadTimeOut(true);
@@ -1930,7 +1949,8 @@ public class IgnitionEx {
cpus * 2,
3000L,
new LinkedBlockingQueue<Runnable>(1000),
- GridIoPolicy.IDX_POOL
+ GridIoPolicy.IDX_POOL,
+ oomeHnd
);
}
@@ -1943,7 +1963,8 @@ public class IgnitionEx {
cfg.getQueryThreadPoolSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.QUERY_POOL);
+ GridIoPolicy.QUERY_POOL,
+ oomeHnd);
qryExecSvc.allowCoreThreadTimeOut(true);
@@ -1954,7 +1975,8 @@ public class IgnitionEx {
2,
DFLT_THREAD_KEEP_ALIVE_TIME,
new LinkedBlockingQueue<Runnable>(),
- GridIoPolicy.SCHEMA_POOL);
+ GridIoPolicy.SCHEMA_POOL,
+ oomeHnd);
schemaExecSvc.allowCoreThreadTimeOut(true);
@@ -1970,7 +1992,9 @@ public class IgnitionEx {
execCfg.getSize(),
execCfg.getSize(),
DFLT_THREAD_KEEP_ALIVE_TIME,
- new LinkedBlockingQueue<Runnable>());
+ new LinkedBlockingQueue<Runnable>(),
+ GridIoPolicy.UNDEFINED,
+ oomeHnd);
customExecSvcs.put(execCfg.getName(), exec);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 400bb5f..77c9657 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -130,6 +130,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -924,6 +925,8 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
segChkThread = new IgniteThread(segChkWrk);
+ segChkThread.setUncaughtExceptionHandler(new OomExceptionHandler(ctx));
+
segChkThread.start();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 0ac699f..64a6819 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -38,6 +38,7 @@ import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.thread.OomExceptionHandler;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
@@ -473,7 +474,12 @@ public class WalStateManager extends GridCacheSharedManagerAdapter {
// not-yet-flushed dirty pages have been logged.
WalStateChangeWorker worker = new WalStateChangeWorker(msg, cpFut);
- new IgniteThread(worker).start();
+ IgniteThread thread = new IgniteThread(worker);
+
+ thread.setUncaughtExceptionHandler(new OomExceptionHandler(
+ cctx.kernalContext()));
+
+ thread.start();
}
else {
// Disable: not-yet-flushed operations are not logged, so wait for them
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index cebe4b1..2d48b7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -88,6 +88,7 @@ import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -1727,6 +1728,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
+ checker.setUncaughtExceptionHandler(new OomExceptionHandler(ctx));
+
bufCheckThreads.put(routineId, checker);
checker.start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 8b984c0..e63d7d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.stream.StreamReceiver;
import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.thread.OomExceptionHandler;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
@@ -125,6 +126,8 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter {
}
});
+ flusher.setUncaughtExceptionHandler(new OomExceptionHandler(ctx));
+
flusher.start();
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
index 615fb9f..0234e84 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/failure/FailureProcessor.java
@@ -19,12 +19,14 @@ package org.apache.ignite.internal.processors.failure;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureHandler;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandler;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
@@ -40,6 +42,9 @@ public class FailureProcessor extends GridProcessorAdapter {
/** Failure context. */
private volatile FailureContext failureCtx;
+ /** Reserve buffer, which can be dropped to handle OOME. */
+ private volatile byte[] reserveBuf;
+
/**
* @param ctx Context.
*/
@@ -56,6 +61,9 @@ public class FailureProcessor extends GridProcessorAdapter {
if (hnd == null)
hnd = getDefaultFailureHandler();
+ reserveBuf = new byte[IgniteSystemProperties.getInteger(
+ IgniteSystemProperties.IGNITE_FAILURE_HANDLER_RESERVE_BUFFER_SIZE, 64 * 1024)];
+
assert hnd != null;
this.hnd = hnd;
@@ -102,6 +110,9 @@ public class FailureProcessor extends GridProcessorAdapter {
U.error(ignite.log(), "Critical failure. Will be handled accordingly to configured handler [hnd=" +
hnd.getClass() + ", failureCtx=" + failureCtx + ']', failureCtx.error());
+ if (reserveBuf != null && X.hasCause(failureCtx.error(), OutOfMemoryError.class))
+ reserveBuf = null;
+
boolean invalidated = hnd.onFailure(ignite, failureCtx);
if (invalidated) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 6d2e621..f7c07f5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -36,6 +36,8 @@ import org.apache.ignite.compute.ComputeJobContext;
import org.apache.ignite.compute.ComputeJobMasterLeaveAware;
import org.apache.ignite.compute.ComputeUserUndeclaredException;
import org.apache.ignite.events.JobEvent;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.igfs.IgfsOutOfSpaceException;
import org.apache.ignite.internal.GridInternalException;
import org.apache.ignite.internal.GridJobContextImpl;
@@ -603,9 +605,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
X.hasCause(e, ClusterTopologyCheckedException.class))
// Should be throttled, because GridServiceProxy continuously retry getting service.
LT.error(log, e, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']');
- else
+ else {
U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e);
+ if (X.hasCause(e, OutOfMemoryError.class))
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+
ex = e;
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index ff68e72..63f5027 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.service;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -103,6 +104,7 @@ import org.apache.ignite.services.ServiceConfiguration;
import org.apache.ignite.services.ServiceDeploymentException;
import org.apache.ignite.services.ServiceDescriptor;
import org.apache.ignite.thread.IgniteThreadFactory;
+import org.apache.ignite.thread.OomExceptionHandler;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
import java.util.concurrent.ConcurrentHashMap;
@@ -112,7 +114,6 @@ import static org.apache.ignite.IgniteSystemProperties.getString;
import static org.apache.ignite.configuration.DeploymentMode.ISOLATED;
import static org.apache.ignite.configuration.DeploymentMode.PRIVATE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_SERVICES_COMPATIBILITY_MODE;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.UTILITY_CACHE_NAME;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED;
import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
@@ -154,8 +155,12 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
/** Busy lock. */
private volatile GridSpinBusyLock busyLock = new GridSpinBusyLock();
+ /** Uncaught exception handler for thread pools. */
+ private final UncaughtExceptionHandler oomeHnd = new OomExceptionHandler(ctx);
+
/** Thread factory. */
- private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service");
+ private ThreadFactory threadFactory = new IgniteThreadFactory(ctx.igniteInstanceName(), "service",
+ oomeHnd);
/** Thread local for service name. */
private ThreadLocal<String> svcName = new ThreadLocal<>();
@@ -175,7 +180,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
public GridServiceProcessor(GridKernalContext ctx) {
super(ctx);
- depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy"));
+ depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(),
+ "srvc-deploy", oomeHnd));
String servicesCompatibilityMode = getString(IGNITE_SERVICES_COMPATIBILITY_MODE);
@@ -373,7 +379,8 @@ public class GridServiceProcessor extends GridProcessorAdapter implements Ignite
busyLock = new GridSpinBusyLock();
- depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(), "srvc-deploy"));
+ depExe = Executors.newSingleThreadExecutor(new IgniteThreadFactory(ctx.igniteInstanceName(),
+ "srvc-deploy", oomeHnd));
start();
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
index 3cd7484..418812f 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteStripedThreadPoolExecutor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.thread;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -45,10 +46,11 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
* @param igniteInstanceName Node name.
* @param threadNamePrefix Thread name prefix.
*/
- public IgniteStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix) {
+ public IgniteStripedThreadPoolExecutor(int concurrentLvl, String igniteInstanceName, String threadNamePrefix,
+ UncaughtExceptionHandler eHnd) {
execs = new ExecutorService[concurrentLvl];
- ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix);
+ ThreadFactory factory = new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, eHnd);
for (int i = 0; i < concurrentLvl; i++)
execs[i] = Executors.newSingleThreadExecutor(factory);
@@ -173,4 +175,4 @@ public class IgniteStripedThreadPoolExecutor implements ExecutorService {
@Override public String toString() {
return S.toString(IgniteStripedThreadPoolExecutor.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
index 062c973..23bf14d 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadFactory.java
@@ -17,9 +17,9 @@
package org.apache.ignite.thread;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.NotNull;
@@ -41,6 +41,9 @@ public class IgniteThreadFactory implements ThreadFactory {
/** */
private final byte plc;
+ /** Exception handler. */
+ private final UncaughtExceptionHandler eHnd;
+
/**
* Constructs new thread factory for given grid. All threads will belong
* to the same default thread group.
@@ -49,7 +52,19 @@ public class IgniteThreadFactory implements ThreadFactory {
* @param threadName Thread name.
*/
public IgniteThreadFactory(String igniteInstanceName, String threadName) {
- this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED);
+ this(igniteInstanceName, threadName, null);
+ }
+
+ /**
+ * Constructs new thread factory for given grid. All threads will belong
+ * to the same default thread group.
+ *
+ * @param igniteInstanceName Ignite instance name.
+ * @param threadName Thread name.
+ * @param eHnd Uncaught exception handler.
+ */
+ public IgniteThreadFactory(String igniteInstanceName, String threadName, UncaughtExceptionHandler eHnd) {
+ this(igniteInstanceName, threadName, GridIoPolicy.UNDEFINED, eHnd);
}
/**
@@ -59,16 +74,23 @@ public class IgniteThreadFactory implements ThreadFactory {
* @param igniteInstanceName Ignite instance name.
* @param threadName Thread name.
* @param plc {@link GridIoPolicy} for thread pool.
+ * @param eHnd Uncaught exception handler.
*/
- public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc) {
+ public IgniteThreadFactory(String igniteInstanceName, String threadName, byte plc, UncaughtExceptionHandler eHnd) {
this.igniteInstanceName = igniteInstanceName;
this.threadName = threadName;
this.plc = plc;
+ this.eHnd = eHnd;
}
/** {@inheritDoc} */
@Override public Thread newThread(@NotNull Runnable r) {
- return new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc);
+ Thread thread = new IgniteThread(igniteInstanceName, threadName, r, idxGen.incrementAndGet(), -1, plc);
+
+ if (eHnd != null)
+ thread.setUncaughtExceptionHandler(eHnd);
+
+ return thread;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
index 83c64c3..fed77ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/thread/IgniteThreadPoolExecutor.java
@@ -17,6 +17,7 @@
package org.apache.ignite.thread;
+import java.lang.Thread.UncaughtExceptionHandler;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -53,7 +54,8 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
maxPoolSize,
keepAliveTime,
workQ,
- GridIoPolicy.UNDEFINED);
+ GridIoPolicy.UNDEFINED,
+ null);
}
/**
@@ -68,6 +70,7 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
* @param workQ The queue to use for holding tasks before they are executed. This queue will hold only
* runnable tasks submitted by the {@link #execute(Runnable)} method.
* @param plc {@link GridIoPolicy} for thread pool.
+ * @param eHnd Uncaught exception handler for thread pool.
*/
public IgniteThreadPoolExecutor(
String threadNamePrefix,
@@ -76,14 +79,15 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
int maxPoolSize,
long keepAliveTime,
BlockingQueue<Runnable> workQ,
- byte plc) {
+ byte plc,
+ UncaughtExceptionHandler eHnd) {
super(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
workQ,
- new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc)
+ new IgniteThreadFactory(igniteInstanceName, threadNamePrefix, plc, eHnd)
);
}
@@ -114,4 +118,4 @@ public class IgniteThreadPoolExecutor extends ThreadPoolExecutor {
new AbortPolicy()
);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
new file mode 100644
index 0000000..3a62ad8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/thread/OomExceptionHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.thread;
+
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.util.typedef.X;
+
+/**
+ * OOM exception handler for system threads.
+ */
+public class OomExceptionHandler implements Thread.UncaughtExceptionHandler {
+ /** Context. */
+ private final GridKernalContext ctx;
+
+ /**
+ * @param ctx Context.
+ */
+ public OomExceptionHandler(GridKernalContext ctx) {
+ this.ctx = ctx;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void uncaughtException(Thread t, Throwable e) {
+ if (X.hasCause(e, OutOfMemoryError.class))
+ ctx.failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
new file mode 100644
index 0000000..2af94b8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/failure/OomFailureHandlerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.failure;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.services.Service;
+import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Out of memory error failure handler test.
+ */
+public class OomFailureHandlerTest extends AbstractFailureHandlerTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setCacheConfiguration(new CacheConfiguration()
+ .setName(DEFAULT_CACHE_NAME)
+ .setCacheMode(CacheMode.PARTITIONED)
+ .setBackups(0)
+ );
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test OOME in IgniteCompute.
+ */
+ public void testComputeOomError() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ try {
+ IgniteFuture<Boolean> res = ignite0.compute(ignite0.cluster().forNodeId(ignite1.cluster().localNode().id()))
+ .callAsync(new IgniteCallable<Boolean>() {
+ @Override public Boolean call() throws Exception {
+ throw new OutOfMemoryError();
+ }
+ });
+
+ res.get();
+ }
+ catch (Throwable ignore) {
+ // Expected.
+ }
+
+ assertFailureState(ignite0, ignite1);
+ }
+
+ /**
+ * Test OOME in EntryProcessor.
+ */
+ public void testEntryProcessorOomError() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> cache0 = ignite0.getOrCreateCache(DEFAULT_CACHE_NAME);
+ IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ Integer key = primaryKey(cache1);
+
+ cache1.put(key, key);
+
+ try {
+ IgniteFuture fut = cache0.invokeAsync(key, new EntryProcessor<Integer, Integer, Object>() {
+ @Override public Object process(MutableEntry<Integer, Integer> entry,
+ Object... arguments) throws EntryProcessorException {
+ throw new OutOfMemoryError();
+ }
+ });
+
+ fut.get();
+ }
+ catch (Throwable ignore) {
+ // Expected.
+ }
+
+ assertFailureState(ignite0, ignite1);
+ }
+
+ /**
+ * Test OOME in service method invocation.
+ */
+ public void testServiceInvokeOomError() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ Integer key = primaryKey(cache1);
+
+ ignite0.services().deployKeyAffinitySingleton("fail-invoke-service", new FailServiceImpl(false),
+ DEFAULT_CACHE_NAME, key);
+
+ FailService svc = ignite0.services().serviceProxy("fail-invoke-service", FailService.class, false);
+
+ try {
+ svc.fail();
+ }
+ catch (Throwable ignore) {
+ // Expected.
+ }
+
+ assertFailureState(ignite0, ignite1);
+ }
+
+ /**
+ * Test OOME in service execute.
+ */
+ public void testServiceExecuteOomError() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ Integer key = primaryKey(cache1);
+
+ ignite0.services().deployKeyAffinitySingleton("fail-execute-service", new FailServiceImpl(true),
+ DEFAULT_CACHE_NAME, key);
+
+ assertFailureState(ignite0, ignite1);
+ }
+
+ /**
+ * Test OOME in event listener.
+ */
+ public void testEventListenerOomError() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+ IgniteEx ignite1 = startGrid(1);
+
+ IgniteCache<Integer, Integer> cache0 = ignite0.getOrCreateCache(DEFAULT_CACHE_NAME);
+ IgniteCache<Integer, Integer> cache1 = ignite1.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ awaitPartitionMapExchange();
+
+ ignite1.events().localListen(new IgnitePredicate<Event>() {
+ @Override public boolean apply(Event evt) {
+ throw new OutOfMemoryError();
+ }
+ }, EventType.EVT_CACHE_OBJECT_PUT);
+
+ Integer key = primaryKey(cache1);
+
+ try {
+ cache0.put(key, key);
+ }
+ catch (Throwable ignore) {
+ // Expected.
+ }
+
+ assertFailureState(ignite0, ignite1);
+ }
+
+ /**
+ * @param igniteWork Working ignite instance.
+ * @param igniteFail Failed ignite instance.
+ */
+ private static void assertFailureState(Ignite igniteWork, Ignite igniteFail) throws IgniteInterruptedCheckedException {
+ assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() {
+ @Override public boolean apply() {
+ return dummyFailureHandler(igniteFail).failure();
+ }
+ }, 5000L));
+
+ assertFalse(dummyFailureHandler(igniteWork).failure());
+ }
+
+ /**
+ *
+ */
+ private interface FailService extends Service {
+ /**
+ * Fail.
+ */
+ void fail();
+ }
+
+ /**
+ *
+ */
+ private static class FailServiceImpl implements FailService {
+ /** Fail on execute. */
+ private final boolean failOnExec;
+
+ /**
+ * @param failOnExec Fail on execute.
+ */
+ private FailServiceImpl(boolean failOnExec) {
+ this.failOnExec = failOnExec;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void fail() {
+ throw new OutOfMemoryError();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void cancel(ServiceContext ctx) {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void init(ServiceContext ctx) throws Exception {
+ }
+
+ /** {@inheritDoc} */
+ @Override public void execute(ServiceContext ctx) throws Exception {
+ if (failOnExec)
+ throw new OutOfMemoryError();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d1be9b85/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
index c4b7d92..c388f1d 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java
@@ -22,6 +22,7 @@ import junit.framework.TestSuite;
import org.apache.ignite.GridSuppressedExceptionSelfTest;
import org.apache.ignite.failure.FailureHandlerTriggeredTest;
import org.apache.ignite.failure.IoomFailureHandlerTest;
+import org.apache.ignite.failure.OomFailureHandlerTest;
import org.apache.ignite.failure.StopNodeFailureHandlerTest;
import org.apache.ignite.failure.StopNodeOrHaltFailureHandlerTest;
import org.apache.ignite.internal.ClassSetTest;
@@ -199,6 +200,7 @@ public class IgniteBasicTestSuite extends TestSuite {
suite.addTestSuite(StopNodeFailureHandlerTest.class);
suite.addTestSuite(StopNodeOrHaltFailureHandlerTest.class);
suite.addTestSuite(IoomFailureHandlerTest.class);
+ suite.addTestSuite(OomFailureHandlerTest.class);
return suite;
}