You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/19 12:34:20 UTC
[ignite] branch master updated: IGNITE-12774 Handle "Too many open
files" exception - Fixes #7516.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 795617f IGNITE-12774 Handle "Too many open files" exception - Fixes #7516.
795617f is described below
commit 795617fc941a35b0f6cf2faed8029c5f15588957
Author: Sergey Antonov <an...@gmail.com>
AuthorDate: Thu Mar 19 15:27:39 2020 +0300
IGNITE-12774 Handle "Too many open files" exception - Fixes #7516.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../internal/IgniteTooManyOpenFilesException.java | 66 +++++++++++
.../spi/communication/tcp/TcpCommunicationSpi.java | 50 +++++++-
.../TooManyOpenFilesTcpCommunicationSpiTest.java | 130 +++++++++++++++++++++
.../IgniteSpiCommunicationSelfTestSuite.java | 3 +
4 files changed, 248 insertions(+), 1 deletion(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java
new file mode 100644
index 0000000..3c4ac00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.net.SocketException;
+import org.apache.ignite.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Custom exception for {@link SocketException} with Too many open files error message. It needs for failing node, if we
+ * got that exception.
+ */
+public class IgniteTooManyOpenFilesException extends IgniteException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Create empty exception.
+ */
+ public IgniteTooManyOpenFilesException() {
+ super();
+ }
+
+ /**
+ * Creates new exception with given error message.
+ *
+ * @param msg Error message.
+ */
+ public IgniteTooManyOpenFilesException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates new exception with given throwable as a cause and source of error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public IgniteTooManyOpenFilesException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * Creates new exception with given error message and optional nested exception.
+ *
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public IgniteTooManyOpenFilesException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7708789..dac29e3 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -24,6 +24,7 @@ import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.IgniteFeatures;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -161,6 +163,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.util.Objects.nonNull;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
@@ -3103,6 +3106,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
catch (Throwable e) {
fut.onDone(e);
+ if (e instanceof IgniteTooManyOpenFilesException)
+ throw e;
+
if (e instanceof Error)
throw (Error)e;
}
@@ -3529,7 +3535,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (getSpiContext().node(node.id()) == null)
throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
- SocketChannel ch = SocketChannel.open();
+ SocketChannel ch = openSocketChannel();
ch.configureBlocking(true);
@@ -3709,6 +3715,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isDebugEnabled())
log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
+ if (X.hasCause(e, "Too many open files", SocketException.class))
+ throw new IgniteTooManyOpenFilesException(e);
+
// check if timeout occured in case of unrecoverable exception
if (connTimeoutStgy.checkTimeout()) {
U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
@@ -3767,6 +3776,40 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
}
/**
+ * Opens a socket channel.
+ *
+ * @return A new socket channel.
+ * @throws IOException If an I/O error occurs.
+ */
+ protected SocketChannel openSocketChannel() throws IOException {
+ return SocketChannel.open();
+ }
+
+ /**
+ * Closing connections to node.
+ * NOTE: It is recommended only for tests.
+ *
+ * @param nodeId Node for which to close connections.
+ * @throws IgniteCheckedException If occurs.
+ */
+ void closeConnections(UUID nodeId) throws IgniteCheckedException {
+ GridCommunicationClient[] clients = this.clients.remove(nodeId);
+ if (nonNull(clients)) {
+ for (GridCommunicationClient client : clients)
+ client.forceClose();
+ }
+
+ for (ConnectionKey connKey : clientFuts.keySet()) {
+ if (!nodeId.equals(connKey))
+ continue;
+
+ GridFutureAdapter<GridCommunicationClient> fut = clientFuts.remove(connKey);
+ if (nonNull(fut))
+ fut.get().forceClose();
+ }
+ }
+
+ /**
* Check is passed socket address belong to current node. This method should return true only if the passed
* in address represent an address which will result in a connection to the local node.
*
@@ -4872,6 +4915,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
if (log.isDebugEnabled())
log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
}
+ catch (IgniteTooManyOpenFilesException e) {
+ onException(e.getMessage(), e);
+
+ throw e;
+ }
catch (IgniteCheckedException | IgniteException e) {
try {
if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java
new file mode 100644
index 0000000..bebffa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test class for error "Too many open files" when in {@link TcpCommunicationSpi}.
+ */
+public class TooManyOpenFilesTcpCommunicationSpiTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setCommunicationSpi(new TooManyOpenFilesTcpCommunicationSpi())
+ .setConsistentId(igniteInstanceName)
+ .setCacheConfiguration(
+ new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(TRANSACTIONAL)
+ .setBackups(1)
+ .setCacheMode(PARTITIONED)
+ );
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ super.afterTest();
+ }
+
+ /**
+ * Test checks that node will fail in case of error "Too many open files"
+ * in {@link TcpCommunicationSpi}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testTooManyOpenFilesErr() throws Exception {
+ IgniteEx crd = startGrids(3);
+ crd.cluster().state(ClusterState.ACTIVE);
+
+ IgniteEx stopNode = grid(2);
+
+ TooManyOpenFilesTcpCommunicationSpi stopNodeSpi = (TooManyOpenFilesTcpCommunicationSpi)
+ stopNode.context().config().getCommunicationSpi();
+
+ IgniteEx txNode = grid(1);
+
+ try (Transaction tx = txNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 4)) {
+ IgniteCache<Object, Object> cache = txNode.cache(DEFAULT_CACHE_NAME);
+
+ cache.put(0, 1);
+
+ stopNodeSpi.throwException.set(true);
+ stopNodeSpi.closeConnections(txNode.localNode().id());
+
+ cache.put(1, 2);
+ cache.put(2, 3);
+ cache.put(3, 4);
+
+ // hungs here.
+ tx.commit();
+ }
+ catch (ClusterTopologyException e) {
+ log.error("Error wait commit", e);
+ }
+
+ assertTrue(waitForCondition(((IgniteKernal)stopNode)::isStopping, 60_000));
+ }
+
+ /**
+ * Class for emulating "Too many open files" error in
+ * {@link TcpCommunicationSpi}.
+ */
+ private static class TooManyOpenFilesTcpCommunicationSpi extends TcpCommunicationSpi {
+ /** Flag for throwing an exception "Too many open files". */
+ private final AtomicBoolean throwException = new AtomicBoolean();
+
+ /** {@inheritDoc} */
+ @Override protected SocketChannel openSocketChannel() throws IOException {
+ if (throwException.get())
+ throw new SocketException("Too many open files");
+
+ return super.openSocketChannel();
+ }
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1bb1724..7ad704b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -46,6 +46,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFreezingClient
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
+import org.apache.ignite.spi.communication.tcp.TooManyOpenFilesTcpCommunicationSpiTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -97,6 +98,8 @@ import org.junit.runners.Suite;
IgniteTcpCommunicationHandshakeWaitSslTest.class,
IgniteTcpCommunicationConnectOnInitTest.class,
+ TooManyOpenFilesTcpCommunicationSpiTest.class
+
//GridCacheDhtLockBackupSelfTest.class,
})
public class IgniteSpiCommunicationSelfTestSuite {