You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2020/03/19 12:34:20 UTC

[ignite] branch master updated: IGNITE-12774 Handle "Too many open files" exception - Fixes #7516.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 795617f  IGNITE-12774 Handle "Too many open files" exception - Fixes #7516.
795617f is described below

commit 795617fc941a35b0f6cf2faed8029c5f15588957
Author: Sergey Antonov <an...@gmail.com>
AuthorDate: Thu Mar 19 15:27:39 2020 +0300

    IGNITE-12774 Handle "Too many open files" exception - Fixes #7516.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../internal/IgniteTooManyOpenFilesException.java  |  66 +++++++++++
 .../spi/communication/tcp/TcpCommunicationSpi.java |  50 +++++++-
 .../TooManyOpenFilesTcpCommunicationSpiTest.java   | 130 +++++++++++++++++++++
 .../IgniteSpiCommunicationSelfTestSuite.java       |   3 +
 4 files changed, 248 insertions(+), 1 deletion(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java
new file mode 100644
index 0000000..3c4ac00
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteTooManyOpenFilesException.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import java.net.SocketException;
+import org.apache.ignite.IgniteException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Custom exception for {@link SocketException} with Too many open files error message. It needs for failing node, if we
+ * got that exception.
+ */
+public class IgniteTooManyOpenFilesException extends IgniteException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Create empty exception.
+     */
+    public IgniteTooManyOpenFilesException() {
+        super();
+    }
+
+    /**
+     * Creates new exception with given error message.
+     *
+     * @param msg Error message.
+     */
+    public IgniteTooManyOpenFilesException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates new exception with given throwable as a cause and source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public IgniteTooManyOpenFilesException(Throwable cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates new exception with given error message and optional nested exception.
+     *
+     * @param msg Error message.
+     * @param cause Optional nested exception (can be {@code null}).
+     */
+    public IgniteTooManyOpenFilesException(String msg, @Nullable Throwable cause) {
+        super(msg, cause);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 7708789..dac29e3 100755
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -24,6 +24,7 @@ import java.net.ConnectException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -71,6 +72,7 @@ import org.apache.ignite.internal.IgniteFeatures;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.IgniteTooManyOpenFilesException;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
@@ -161,6 +163,7 @@ import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static java.util.Objects.nonNull;
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
@@ -3103,6 +3106,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     catch (Throwable e) {
                         fut.onDone(e);
 
+                        if (e instanceof IgniteTooManyOpenFilesException)
+                            throw e;
+
                         if (e instanceof Error)
                             throw (Error)e;
                     }
@@ -3529,7 +3535,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     if (getSpiContext().node(node.id()) == null)
                         throw new ClusterTopologyCheckedException("Failed to send message (node left topology): " + node);
 
-                    SocketChannel ch = SocketChannel.open();
+                    SocketChannel ch = openSocketChannel();
 
                     ch.configureBlocking(true);
 
@@ -3709,6 +3715,9 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                     if (log.isDebugEnabled())
                         log.debug("Client creation failed [addr=" + addr + ", err=" + e + ']');
 
+                    if (X.hasCause(e, "Too many open files", SocketException.class))
+                        throw new IgniteTooManyOpenFilesException(e);
+
                     // check if timeout occured in case of unrecoverable exception
                     if (connTimeoutStgy.checkTimeout()) {
                         U.warn(log, "Connection timed out (will stop attempts to perform the connect) " +
@@ -3767,6 +3776,40 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
     }
 
     /**
+     * Opens a socket channel.
+     *
+     * @return A new socket channel.
+     * @throws IOException If an I/O error occurs.
+     */
+    protected SocketChannel openSocketChannel() throws IOException {
+        return SocketChannel.open();
+    }
+
+    /**
+     * Closing connections to node.
+     * NOTE: It is recommended only for tests.
+     *
+     * @param nodeId Node for which to close connections.
+     * @throws IgniteCheckedException If occurs.
+     */
+    void closeConnections(UUID nodeId) throws IgniteCheckedException {
+        GridCommunicationClient[] clients = this.clients.remove(nodeId);
+        if (nonNull(clients)) {
+            for (GridCommunicationClient client : clients)
+                client.forceClose();
+        }
+
+        for (ConnectionKey connKey : clientFuts.keySet()) {
+            if (!nodeId.equals(connKey))
+                continue;
+
+            GridFutureAdapter<GridCommunicationClient> fut = clientFuts.remove(connKey);
+            if (nonNull(fut))
+                fut.get().forceClose();
+        }
+    }
+
+    /**
      * Check is passed  socket address belong to current node. This method should return true only if the passed
      * in address represent an address which will result in a connection to the local node.
      *
@@ -4872,6 +4915,11 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter implements Communicati
                 if (log.isDebugEnabled())
                     log.debug("Recovery reconnect failed, node stopping [rmtNode=" + recoveryDesc.node().id() + ']');
             }
+            catch (IgniteTooManyOpenFilesException e) {
+                onException(e.getMessage(), e);
+
+                throw e;
+            }
             catch (IgniteCheckedException | IgniteException e) {
                 try {
                     if (recoveryDesc.nodeAlive(getSpiContext().node(node.id())) && getSpiContext().pingNode(node.id())) {
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java
new file mode 100644
index 0000000..bebffa9
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/TooManyOpenFilesTcpCommunicationSpiTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.spi.communication.tcp;
+
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.cluster.ClusterTopologyException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.junit.Test;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ * Test class for error "Too many open files" when in {@link TcpCommunicationSpi}.
+ */
+public class TooManyOpenFilesTcpCommunicationSpiTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setCommunicationSpi(new TooManyOpenFilesTcpCommunicationSpi())
+            .setConsistentId(igniteInstanceName)
+            .setCacheConfiguration(
+                new CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                    .setAtomicityMode(TRANSACTIONAL)
+                    .setBackups(1)
+                    .setCacheMode(PARTITIONED)
+            );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        super.afterTest();
+    }
+
+    /**
+     * Test checks that node will fail in case of error "Too many open files"
+     * in {@link TcpCommunicationSpi}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testTooManyOpenFilesErr() throws Exception {
+        IgniteEx crd = startGrids(3);
+        crd.cluster().state(ClusterState.ACTIVE);
+
+        IgniteEx stopNode = grid(2);
+
+        TooManyOpenFilesTcpCommunicationSpi stopNodeSpi = (TooManyOpenFilesTcpCommunicationSpi)
+            stopNode.context().config().getCommunicationSpi();
+
+        IgniteEx txNode = grid(1);
+
+        try (Transaction tx = txNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 60_000, 4)) {
+            IgniteCache<Object, Object> cache = txNode.cache(DEFAULT_CACHE_NAME);
+
+            cache.put(0, 1);
+
+            stopNodeSpi.throwException.set(true);
+            stopNodeSpi.closeConnections(txNode.localNode().id());
+
+            cache.put(1, 2);
+            cache.put(2, 3);
+            cache.put(3, 4);
+
+            // hungs here.
+            tx.commit();
+        }
+        catch (ClusterTopologyException e) {
+            log.error("Error wait commit", e);
+        }
+
+        assertTrue(waitForCondition(((IgniteKernal)stopNode)::isStopping, 60_000));
+    }
+
+    /**
+     * Class for emulating "Too many open files" error in
+     * {@link TcpCommunicationSpi}.
+     */
+    private static class TooManyOpenFilesTcpCommunicationSpi extends TcpCommunicationSpi {
+        /** Flag for throwing an exception "Too many open files". */
+        private final AtomicBoolean throwException = new AtomicBoolean();
+
+        /** {@inheritDoc} */
+        @Override protected SocketChannel openSocketChannel() throws IOException {
+            if (throwException.get())
+                throw new SocketException("Too many open files");
+
+            return super.openSocketChannel();
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
index 1bb1724..7ad704b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSpiCommunicationSelfTestSuite.java
@@ -46,6 +46,7 @@ import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiFreezingClient
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiHalfOpenedConnectionTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpiSkipMessageSendTest;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationStatisticsTest;
+import org.apache.ignite.spi.communication.tcp.TooManyOpenFilesTcpCommunicationSpiTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -97,6 +98,8 @@ import org.junit.runners.Suite;
     IgniteTcpCommunicationHandshakeWaitSslTest.class,
     IgniteTcpCommunicationConnectOnInitTest.class,
 
+    TooManyOpenFilesTcpCommunicationSpiTest.class
+
     //GridCacheDhtLockBackupSelfTest.class,
 })
 public class IgniteSpiCommunicationSelfTestSuite {