You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2018/04/13 09:33:33 UTC
[21/54] [abbrv] ignite git commit: ignite-7772 System workers
critical failures handling
ignite-7772 System workers critical failures handling
Signed-off-by: Andrey Gura <ag...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c807ae95
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c807ae95
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c807ae95
Branch: refs/heads/ignite-6083
Commit: c807ae952c233cf1a8c0a63d543fafe19c40c6aa
Parents: 05d7092
Author: Andrey Kuznetsov <st...@gmail.com>
Authored: Tue Apr 10 17:30:12 2018 +0300
Committer: Andrey Gura <ag...@apache.org>
Committed: Tue Apr 10 17:30:12 2018 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 15 +-
.../GridClientConnectionManagerAdapter.java | 6 +
.../impl/GridTcpRouterNioListenerAdapter.java | 6 +
.../discovery/GridDiscoveryManager.java | 16 +-
.../GridCachePartitionExchangeManager.java | 12 +-
.../cache/GridCacheSharedTtlCleanupManager.java | 41 +++--
.../GridCacheDatabaseSharedManager.java | 60 +++++--
.../wal/FileWriteAheadLogManager.java | 157 ++++++++++++-------
.../wal/FsyncModeFileWriteAheadLogManager.java | 34 +++-
.../timeout/GridTimeoutProcessor.java | 102 +++++++-----
.../ignite/internal/util/StripedExecutor.java | 69 +++++---
.../ignite/internal/util/nio/GridNioServer.java | 43 ++++-
.../util/nio/GridNioServerListener.java | 6 +
.../util/nio/GridNioServerListenerAdapter.java | 6 +
.../communication/tcp/TcpCommunicationSpi.java | 41 ++++-
.../ignite/spi/discovery/tcp/ServerImpl.java | 51 +++++-
.../internal/util/StripedExecutorTest.java | 2 +-
17 files changed, 501 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 417ba1e..10a0752 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -137,6 +137,7 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME;
import static org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEMORY_POLICY_MAX_SIZE;
import static org.apache.ignite.configuration.MemoryConfiguration.DFLT_MEM_PLC_DEFAULT_NAME;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.IgniteComponentType.SPRING;
import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.RESTART_JVM;
@@ -1806,7 +1807,13 @@ public class IgnitionEx {
cfg.getStripedPoolSize(),
cfg.getIgniteInstanceName(),
"sys",
- log);
+ log,
+ new Thread.UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread thread, Throwable t) {
+ if (grid != null)
+ grid.context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t));
+ }
+ });
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
@@ -1846,6 +1853,12 @@ public class IgnitionEx {
cfg.getIgniteInstanceName(),
"data-streamer",
log,
+ new Thread.UncaughtExceptionHandler() {
+ @Override public void uncaughtException(Thread thread, Throwable t) {
+ if (grid != null)
+ grid.context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, t));
+ }
+ },
true);
// Note that we do not pre-start threads here as igfs pool may not be needed.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 829b188..fe0453f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -38,6 +38,7 @@ import java.util.logging.Logger;
import javax.net.ssl.SSLContext;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.client.GridClientClosedException;
import org.apache.ignite.internal.client.GridClientConfiguration;
import org.apache.ignite.internal.client.GridClientException;
@@ -656,6 +657,11 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
}
}
+ /** {@inheritDoc} */
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+ // No-op.
+ }
+
/**
* Handles client handshake response.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
index 22f5152..75aa6f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterNioListenerAdapter.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.client.GridClientFuture;
import org.apache.ignite.internal.client.GridClientFutureListener;
@@ -191,6 +192,11 @@ public abstract class GridTcpRouterNioListenerAdapter implements GridNioServerLi
}
/** {@inheritDoc} */
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public void onSessionWriteTimeout(GridNioSession ses) {
U.warn(log, "Closing NIO session because of write timeout.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 4c5690e..b0d3256 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -147,6 +147,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DATA_REGIONS_OFFHEAP_SIZE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
@@ -2669,13 +2671,21 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
body0();
}
catch (InterruptedException e) {
+ if (!isCancelled)
+ ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, e));
+
throw e;
}
catch (Throwable t) {
- U.error(log, "Unexpected exception in discovery worker thread (ignored).", t);
+ U.error(log, "Exception in discovery worker thread.", t);
+
+ if (t instanceof Error) {
+ FailureType type = t instanceof OutOfMemoryError ? CRITICAL_ERROR : SYSTEM_WORKER_TERMINATION;
- if (t instanceof Error)
- throw (Error)t;
+ ctx.failure().process(new FailureContext(type, t));
+
+ throw t;
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 77ffce3..e40493f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -123,6 +123,7 @@ import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
@@ -2274,11 +2275,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
try {
body0();
}
+ catch (InterruptedException | IgniteInterruptedCheckedException e) {
+ if (!stop)
+ err = e;
+ }
catch (Throwable e) {
err = e;
}
finally {
- if (!stop)
+ if (err == null && !stop)
+ err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
index 8f3d738..613e93b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedTtlCleanupManager.java
@@ -20,11 +20,15 @@ package org.apache.ignite.internal.processors.cache;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+
/**
* Periodically removes expired entities from caches with {@link CacheConfiguration#isEagerTtl()} flag set.
*/
@@ -122,19 +126,38 @@ public class GridCacheSharedTtlCleanupManager extends GridCacheSharedManagerAdap
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- boolean expiredRemains = false;
+ Throwable err = null;
+
+ try {
+ while (!isCancelled()) {
+ boolean expiredRemains = false;
+
+ for (GridCacheTtlManager mgr : mgrs) {
+ if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
+ expiredRemains = true;
- for (GridCacheTtlManager mgr : mgrs) {
- if (mgr.expire(CLEANUP_WORKER_ENTRIES_PROCESS_LIMIT))
- expiredRemains = true;
+ if (isCancelled())
+ return;
+ }
- if (isCancelled())
- return;
+ if (!expiredRemains)
+ U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
}
+ }
+ catch (Throwable t) {
+ if (!(t instanceof IgniteInterruptedCheckedException))
+ err = t;
- if (!expiredRemains)
- U.sleep(CLEANUP_WORKER_SLEEP_INTERVAL);
+ throw t;
+ }
+ finally {
+ if (err == null && !isCancelled)
+ err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 70fc688..caf27b7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -165,6 +165,8 @@ import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_MAX_CHECKPOINT_MEMORY_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
/**
@@ -2787,32 +2789,58 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
}
/** {@inheritDoc} */
- @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
- while (!isCancelled()) {
- waitCheckpointEvent();
+ @Override protected void body() {
+ Throwable err = null;
- GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
+ try {
+ while (!isCancelled()) {
+ waitCheckpointEvent();
- if (enableChangeApplied != null) {
- enableChangeApplied.onDone();
+ GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
- GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
- }
+ if (enableChangeApplied != null) {
+ enableChangeApplied.onDone();
- if (checkpointsEnabled)
- doCheckpoint();
- else {
- synchronized (this) {
- scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
+ GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
+ }
+
+ if (checkpointsEnabled)
+ doCheckpoint();
+ else {
+ synchronized (this) {
+ scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
+ }
}
}
}
+ catch (Throwable t) {
+ err = t;
+
+ scheduledCp.cpFinishFut.onDone(t);
+
+ throw t;
+ }
+ finally {
+ if (err == null && !(stopping && isCancelled))
+ err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
// Final run after the cancellation.
- if (checkpointsEnabled && !shutdownNow)
- doCheckpoint();
+ if (checkpointsEnabled && !shutdownNow) {
+ try {
+ doCheckpoint();
- scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+ scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
+ }
+ catch (Throwable e) {
+ scheduledCp.cpFinishFut.onDone(e);
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 2fff481..a40811b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -117,6 +117,8 @@ import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.util.IgniteUtils.findField;
@@ -682,7 +684,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (IgniteCheckedException e) {
U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
}
}
@@ -1234,7 +1236,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (IOException e) {
StorageException se = new StorageException("Unable to initialize WAL segment", e);
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se));
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
throw se;
}
@@ -1499,6 +1501,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
+ Throwable err = null;
+
try {
synchronized (this) {
while (curAbsWalIdx == -1 && !stopped)
@@ -1560,6 +1564,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !stopped)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
}
/**
@@ -1884,8 +1900,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
catch (IgniteCheckedException | IOException e) {
U.error(log, "Unexpected error during WAL compression", e);
-
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
@@ -2005,6 +2019,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override public void run() {
+ Throwable err = null;
+
while (!Thread.currentThread().isInterrupted() && !stopped) {
try {
long segmentToDecompress = segmentsQueue.take();
@@ -2034,10 +2050,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
- catch (IOException e) {
- U.error(log, "Unexpected error during WAL decompression", e);
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !stopped)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly");
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
}
@@ -3146,78 +3169,94 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** {@inheritDoc} */
@Override public void run() {
- while (!shutdown && !Thread.currentThread().isInterrupted()) {
- while (waiters.isEmpty()) {
- if (!shutdown)
- LockSupport.park();
- else {
- unparkWaiters(Long.MAX_VALUE);
-
- return;
- }
- }
+ Throwable err = null;
- Long pos = null;
+ try {
+ while (!shutdown && !Thread.currentThread().isInterrupted()) {
+ while (waiters.isEmpty()) {
+ if (!shutdown)
+ LockSupport.park();
+ else {
+ unparkWaiters(Long.MAX_VALUE);
- for (Long val : waiters.values()) {
- if (val > Long.MIN_VALUE)
- pos = val;
- }
+ return;
+ }
+ }
- if (pos == null)
- continue;
- else if (pos < UNCONDITIONAL_FLUSH) {
- try {
- assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
+ Long pos = null;
- if (pos == FILE_CLOSE)
- currHnd.fileIO.close();
- else if (pos == FILE_FORCE)
- currHnd.fileIO.force();
+ for (Long val : waiters.values()) {
+ if (val > Long.MIN_VALUE)
+ pos = val;
}
- catch (IOException e) {
- log.error("Exception in WAL writer thread: ", e);
- err = e;
+ if (pos == null)
+ continue;
+ else if (pos < UNCONDITIONAL_FLUSH) {
+ try {
+ assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
- unparkWaiters(Long.MAX_VALUE);
+ if (pos == FILE_CLOSE)
+ currHnd.fileIO.close();
+ else if (pos == FILE_FORCE)
+ currHnd.fileIO.force();
+ }
+ catch (IOException e) {
+ log.error("Exception in WAL writer thread: ", e);
- return;
- }
+ err = e;
- unparkWaiters(pos);
- }
+ unparkWaiters(Long.MAX_VALUE);
- List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
+ return;
+ }
- if (segs == null) {
- unparkWaiters(pos);
+ unparkWaiters(pos);
+ }
- continue;
- }
+ List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
- for (int i = 0; i < segs.size(); i++) {
- SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+ if (segs == null) {
+ unparkWaiters(pos);
- try {
- writeBuffer(seg.position(), seg.buffer());
+ continue;
}
- catch (Throwable e) {
- log.error("Exception in WAL writer thread: ", e);
- err = e;
- }
- finally {
- seg.release();
+ for (int i = 0; i < segs.size(); i++) {
+ SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
+
+ try {
+ writeBuffer(seg.position(), seg.buffer());
+ }
+ catch (Throwable e) {
+ log.error("Exception in WAL writer thread: ", e);
+
+ err = e;
+ }
+ finally {
+ seg.release();
- long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written;
+ long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written;
- unparkWaiters(p);
+ unparkWaiters(p);
+ }
}
}
+
+ unparkWaiters(Long.MAX_VALUE);
}
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !shutdown)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly");
- unparkWaiters(Long.MAX_VALUE);
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
}
/**
@@ -3283,7 +3322,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Throwable err = walWriter.err;
if (err != null)
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err));
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
if (expPos == UNCONDITIONAL_FLUSH)
expPos = (currentHandle().buf.tail());
@@ -3372,7 +3411,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
catch (IOException e) {
StorageException se = new StorageException("Unable to write", e);
- cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, se));
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
throw se;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 59196bb..c7d2c11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -110,6 +110,7 @@ import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
/**
* File WAL manager.
@@ -1338,6 +1339,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
}
}
+ Throwable err = null;
+
try {
synchronized (this) {
while (curAbsWalIdx == -1 && !stopped)
@@ -1399,6 +1402,18 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !stopped)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
}
/**
@@ -1721,8 +1736,6 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
}
catch (IgniteCheckedException | IOException e) {
U.error(log, "Unexpected error during WAL compression", e);
-
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -1814,6 +1827,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
/** {@inheritDoc} */
@Override public void run() {
+ Throwable err = null;
+
while (!Thread.currentThread().isInterrupted() && !stopped) {
try {
long segmentToDecompress = segmentsQueue.take();
@@ -1840,13 +1855,20 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda
decompressionFutures.remove(segmentToDecompress).onDone();
}
}
- catch (InterruptedException e){
+ catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
- catch (IOException e) {
- U.error(log, "Unexpected error during WAL decompression", e);
+ catch (Throwable t) {
+ err = t;
+ }
+ finally {
+ if (err == null && !stopped)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly");
- cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
+ if (err instanceof OutOfMemoryError)
+ cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
index ff6beb4..a09d6fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/timeout/GridTimeoutProcessor.java
@@ -21,6 +21,7 @@ import java.io.Closeable;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
@@ -32,6 +33,9 @@ import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
+
/**
* Detects timeout events and processes them.
*/
@@ -146,61 +150,81 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- while (!isCancelled()) {
- long now = U.currentTimeMillis();
+ Throwable err = null;
- for (Iterator<GridTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext();) {
- GridTimeoutObject timeoutObj = iter.next();
+ try {
+ while (!isCancelled()) {
+ long now = U.currentTimeMillis();
- if (timeoutObj.endTime() <= now) {
- try {
- boolean rmvd = timeoutObjs.remove(timeoutObj);
+ for (Iterator<GridTimeoutObject> iter = timeoutObjs.iterator(); iter.hasNext(); ) {
+ GridTimeoutObject timeoutObj = iter.next();
- if (log.isDebugEnabled())
- log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']');
+ if (timeoutObj.endTime() <= now) {
+ try {
+ boolean rmvd = timeoutObjs.remove(timeoutObj);
- if (rmvd)
- timeoutObj.onTimeout();
- }
- catch (Throwable e) {
- if (isCancelled() && !(e instanceof Error)){
if (log.isDebugEnabled())
- log.debug("Error when executing timeout callback: " + timeoutObj);
+ log.debug("Timeout has occurred [obj=" + timeoutObj + ", process=" + rmvd + ']');
- return;
+ if (rmvd)
+ timeoutObj.onTimeout();
}
+ catch (Throwable e) {
+ if (isCancelled() && !(e instanceof Error)) {
+ if (log.isDebugEnabled())
+ log.debug("Error when executing timeout callback: " + timeoutObj);
- U.error(log, "Error when executing timeout callback: " + timeoutObj, e);
+ return;
+ }
- if (e instanceof Error)
- throw e;
+ U.error(log, "Error when executing timeout callback: " + timeoutObj, e);
+
+ if (e instanceof Error)
+ throw e;
+ }
}
+ else
+ break;
}
- else
- break;
- }
-
- synchronized (mux) {
- while (!isCancelled()) {
- // Access of the first element must be inside of
- // synchronization block, so we don't miss out
- // on thread notification events sent from
- // 'addTimeoutObject(..)' method.
- GridTimeoutObject first = timeoutObjs.firstx();
-
- if (first != null) {
- long waitTime = first.endTime() - U.currentTimeMillis();
- if (waitTime > 0)
- mux.wait(waitTime);
+ synchronized (mux) {
+ while (!isCancelled()) {
+ // Access of the first element must be inside of
+ // synchronization block, so we don't miss out
+ // on thread notification events sent from
+ // 'addTimeoutObject(..)' method.
+ GridTimeoutObject first = timeoutObjs.firstx();
+
+ if (first != null) {
+ long waitTime = first.endTime() - U.currentTimeMillis();
+
+ if (waitTime > 0)
+ mux.wait(waitTime);
+ else
+ break;
+ }
else
- break;
+ mux.wait(5000);
}
- else
- mux.wait(5000);
}
}
}
+ catch (Throwable t) {
+ if (!(t instanceof InterruptedException))
+ err = t;
+
+ throw t;
+ }
+ finally {
+ if (err == null && !isCancelled)
+ err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly.");
+
+ if (err instanceof OutOfMemoryError)
+ ctx.failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ ctx.failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
+
}
}
@@ -284,4 +308,4 @@ public class GridTimeoutProcessor extends GridProcessorAdapter {
return S.toString(CancelableTask.class, this);
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
index 630d34c..c6383ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/StripedExecutor.java
@@ -64,9 +64,11 @@ public class StripedExecutor implements ExecutorService {
* @param igniteInstanceName Node name.
* @param poolName Pool name.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
- public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log) {
- this(cnt, igniteInstanceName, poolName, log, false);
+ public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd) {
+ this(cnt, igniteInstanceName, poolName, log, errHnd, false);
}
/**
@@ -74,9 +76,11 @@ public class StripedExecutor implements ExecutorService {
* @param igniteInstanceName Node name.
* @param poolName Pool name.
* @param log Logger.
+ * @param errHnd Exception handler.
* @param stealTasks {@code True} to steal tasks.
*/
- public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log, boolean stealTasks) {
+ public StripedExecutor(int cnt, String igniteInstanceName, String poolName, final IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd, boolean stealTasks) {
A.ensure(cnt > 0, "cnt > 0");
boolean success = false;
@@ -91,15 +95,9 @@ public class StripedExecutor implements ExecutorService {
try {
for (int i = 0; i < cnt; i++) {
- stripes[i] = stealTasks ? new StripeConcurrentQueue(
- igniteInstanceName,
- poolName,
- i,
- log, stripes) : new StripeConcurrentQueue(
- igniteInstanceName,
- poolName,
- i,
- log);
+ stripes[i] = stealTasks
+ ? new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, stripes, errHnd)
+ : new StripeConcurrentQueue(igniteInstanceName, poolName, i, log, errHnd);
}
for (int i = 0; i < cnt; i++)
@@ -434,22 +432,28 @@ public class StripedExecutor implements ExecutorService {
/** Thread executing the loop. */
protected Thread thread;
+ /** Exception handler. */
+ private Thread.UncaughtExceptionHandler errHnd;
+
/**
* @param igniteInstanceName Ignite instance name.
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
public Stripe(
String igniteInstanceName,
String poolName,
int idx,
- IgniteLogger log
+ IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd
) {
this.igniteInstanceName = igniteInstanceName;
this.poolName = poolName;
this.idx = idx;
this.log = log;
+ this.errHnd = errHnd;
}
/**
@@ -463,6 +467,8 @@ public class StripedExecutor implements ExecutorService {
idx,
GridIoPolicy.UNDEFINED);
+ thread.setUncaughtExceptionHandler(errHnd);
+
thread.start();
}
@@ -518,9 +524,19 @@ public class StripedExecutor implements ExecutorService {
return;
}
catch (Throwable e) {
+ if (e instanceof OutOfMemoryError) {
+ // Re-throwing to exploit uncaught exception handler.
+ throw e;
+ }
+
U.error(log, "Failed to execute runnable.", e);
}
}
+
+ if (!stopping) {
+ throw new IllegalStateException("Thread " + Thread.currentThread().getName() +
+ " is terminated unexpectedly");
+ }
}
/**
@@ -576,14 +592,16 @@ public class StripedExecutor implements ExecutorService {
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
StripeConcurrentQueue(
String igniteInstanceName,
String poolName,
int idx,
- IgniteLogger log
+ IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd
) {
- this(igniteInstanceName, poolName, idx, log, null);
+ this(igniteInstanceName, poolName, idx, log, null, errHnd);
}
/**
@@ -591,19 +609,22 @@ public class StripedExecutor implements ExecutorService {
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
StripeConcurrentQueue(
String igniteInstanceName,
String poolName,
int idx,
IgniteLogger log,
- Stripe[] others
+ Stripe[] others,
+ Thread.UncaughtExceptionHandler errHnd
) {
super(
igniteInstanceName,
poolName,
idx,
- log);
+ log,
+ errHnd);
this.others = others;
@@ -702,17 +723,20 @@ public class StripedExecutor implements ExecutorService {
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
public StripeConcurrentQueueNoPark(
String igniteInstanceName,
String poolName,
int idx,
- IgniteLogger log
+ IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd
) {
super(igniteInstanceName,
poolName,
idx,
- log);
+ log,
+ errHnd);
}
/** {@inheritDoc} */
@@ -758,17 +782,20 @@ public class StripedExecutor implements ExecutorService {
* @param poolName Pool name.
* @param idx Stripe index.
* @param log Logger.
+ * @param errHnd Exception handler.
*/
public StripeConcurrentBlockingQueue(
String igniteInstanceName,
String poolName,
int idx,
- IgniteLogger log
+ IgniteLogger log,
+ Thread.UncaughtExceptionHandler errHnd
) {
super(igniteInstanceName,
poolName,
idx,
- log);
+ log,
+ errHnd);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 0fcde0e..3597a05 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -77,6 +77,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.MSG_WRITER;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.NIO_OPERATION;
@@ -1749,6 +1751,8 @@ public class GridNioServer<T> {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ Throwable err = null;
+
try {
boolean reset = false;
@@ -1774,9 +1778,24 @@ public class GridNioServer<T> {
catch (Throwable e) {
U.error(log, "Caught unhandled exception in NIO worker thread (restart the node).", e);
+ err = e;
+
if (e instanceof Error)
throw e;
}
+ finally {
+ if (err instanceof OutOfMemoryError)
+ lsnr.onFailure(CRITICAL_ERROR, err);
+ else if (!closed) {
+ if (err == null)
+ lsnr.onFailure(SYSTEM_WORKER_TERMINATION,
+ new IllegalStateException("Thread " + name() + " is terminated unexpectedly"));
+ else if (err instanceof InterruptedException)
+ lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+ }
+ else if (err != null)
+ lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+ }
}
/**
@@ -2790,6 +2809,8 @@ public class GridNioServer<T> {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
+ Throwable err = null;
+
try {
boolean reset = false;
@@ -2812,8 +2833,28 @@ public class GridNioServer<T> {
}
}
}
+ catch (Throwable t) {
+ if (!(t instanceof IgniteInterruptedCheckedException))
+ err = t;
+
+ throw t;
+ }
finally {
- closeSelector(); // Safety.
+ try {
+ closeSelector(); // Safety.
+ }
+ catch (RuntimeException ignore) {
+ // No-op.
+ }
+
+ if (err == null && !closed)
+ err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
+
+ if (err instanceof OutOfMemoryError)
+ lsnr.onFailure(CRITICAL_ERROR, err);
+ else if (err != null)
+ lsnr.onFailure(SYSTEM_WORKER_TERMINATION, err);
+
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
index db28792..14c5a74 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListener.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.failure.FailureType;
import org.jetbrains.annotations.Nullable;
/**
@@ -69,4 +70,9 @@ public interface GridNioServerListener<T> {
* @param ses Session that is idle.
*/
public void onSessionIdleTimeout(GridNioSession ses);
+
+ /**
+ * Called when critical failure occurs in server implementation.
+ */
+ public void onFailure(FailureType failureType, Throwable failure);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
index 5d222c1..b6b20b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServerListenerAdapter.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.util.nio;
+import org.apache.ignite.failure.FailureType;
+
/**
* Server listener adapter providing empty methods implementation for rarely used methods.
*/
@@ -35,4 +37,8 @@ public abstract class GridNioServerListenerAdapter<T> implements GridNioServerLi
@Override public void onMessageSent(GridNioSession ses, T msg) {
// No-op.
}
+
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+ // No-op.
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 4a0710e..9e7b592 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -62,7 +62,10 @@ import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
@@ -151,6 +154,8 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.util.nio.GridNioSessionMetaKey.SSL_META;
import static org.apache.ignite.spi.communication.tcp.internal.TcpCommunicationConnectionCheckFuture.SES_FUT_META;
import static org.apache.ignite.spi.communication.tcp.messages.RecoveryLastReceivedMessage.ALREADY_CONNECTED;
@@ -798,6 +803,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
}
+ /** {@inheritDoc} */
+ @Override public void onFailure(FailureType failureType, Throwable failure) {
+ ((IgniteEx)ignite).context().failure().process(new FailureContext(failureType, failure));
+ }
+
/**
* @param recovery Recovery descriptor.
* @param ses Session.
@@ -4190,13 +4200,32 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isDebugEnabled())
log.debug("Tcp communication worker has been started.");
- while (!isInterrupted()) {
- DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+ Throwable err = null;
- if (disconnectData != null)
- processDisconnect(disconnectData);
- else
- processIdle();
+ try {
+ while (!isInterrupted()) {
+ DisconnectedSessionInfo disconnectData = q.poll(idleConnTimeout, TimeUnit.MILLISECONDS);
+
+ if (disconnectData != null)
+ processDisconnect(disconnectData);
+ else
+ processIdle();
+ }
+ }
+ catch (Throwable t) {
+ if (!(t instanceof InterruptedException))
+ err = t;
+
+ throw t;
+ }
+ finally {
+ if (err == null && !stopping)
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly.");
+
+ if (err instanceof OutOfMemoryError)
+ ((IgniteEx)ignite).context().failure().process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ ((IgniteEx)ignite).context().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 4aa1316..7bf37e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -50,6 +50,7 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
@@ -66,6 +67,8 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheMetrics;
import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.failure.FailureContext;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNodeAttributes;
@@ -73,6 +76,7 @@ import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.discovery.CustomMessageWrapper;
import org.apache.ignite.internal.managers.discovery.DiscoveryServerOnlyCustomMessage;
+import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.security.SecurityContext;
import org.apache.ignite.internal.processors.security.SecurityUtils;
import org.apache.ignite.internal.util.GridBoundedLinkedHashSet;
@@ -137,7 +141,6 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryServerOnlyCustom
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
-import java.util.concurrent.ConcurrentHashMap;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
@@ -149,6 +152,8 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.events.EventType.EVT_NODE_SEGMENTED;
+import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
+import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
@@ -2609,12 +2614,20 @@ class ServerImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
+ Throwable err = null;
+
try {
super.body();
}
+ catch (InterruptedException e) {
+ if (!spi.isNodeStopping0())
+ err = e;
+
+ throw e;
+ }
catch (Throwable e) {
if (!spi.isNodeStopping0() && spiStateCopy() != DISCONNECTING) {
- final Ignite ignite = spi.ignite();
+ final Ignite ignite = spi.ignite();
if (ignite != null) {
U.error(log, "TcpDiscoverSpi's message worker thread failed abnormally. " +
@@ -2637,9 +2650,22 @@ class ServerImpl extends TcpDiscoveryImpl {
}
}
+ err = e;
+
// Must be processed by IgniteSpiThread as well.
throw e;
}
+ finally {
+ if (err == null && !spi.isNodeStopping0())
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly.");
+
+ FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure();
+
+ if (err instanceof OutOfMemoryError)
+ failure.process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+ }
}
/**
@@ -5597,7 +5623,9 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/** {@inheritDoc} */
- @Override protected void body() throws InterruptedException {
+ @Override protected void body() {
+ Throwable err = null;
+
try {
while (!isInterrupted()) {
Socket sock = srvrSock.accept();
@@ -5630,13 +5658,30 @@ class ServerImpl extends TcpDiscoveryImpl {
onException("Failed to accept TCP connection.", e);
if (!isInterrupted()) {
+ err = e;
+
if (U.isMacInvalidArgumentError(e))
U.error(log, "Failed to accept TCP connection\n\t" + U.MAC_INVALID_ARG_MSG, e);
else
U.error(log, "Failed to accept TCP connection.", e);
}
}
+ catch (Throwable t) {
+ err = t;
+
+ throw t;
+ }
finally {
+ if (err == null && !spi.isNodeStopping0())
+ err = new IllegalStateException("Thread " + getName() + " is terminated unexpectedly.");
+
+ FailureProcessor failure = ((IgniteEx)spi.ignite()).context().failure();
+
+ if (err instanceof OutOfMemoryError)
+ failure.process(new FailureContext(CRITICAL_ERROR, err));
+ else if (err != null)
+ failure.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
+
U.closeQuiet(srvrSock);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c807ae95/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
index 543907f..3fca7af 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/StripedExecutorTest.java
@@ -29,7 +29,7 @@ public class StripedExecutorTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override public void beforeTest() {
- stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger());
+ stripedExecSvc = new StripedExecutor(3, "foo name", "pool name", new JavaLogger(), (thread, t) -> {});
}
/** {@inheritDoc} */