You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by gn...@apache.org on 2018/07/25 12:58:16 UTC

[3/3] mina-sshd git commit: [SSHD-836] Make final some methods from base Closeable implementations

[SSHD-836] Make final some methods from base Closeable implementations


Project: http://git-wip-us.apache.org/repos/asf/mina-sshd/repo
Commit: http://git-wip-us.apache.org/repos/asf/mina-sshd/commit/bba23bf7
Tree: http://git-wip-us.apache.org/repos/asf/mina-sshd/tree/bba23bf7
Diff: http://git-wip-us.apache.org/repos/asf/mina-sshd/diff/bba23bf7

Branch: refs/heads/master
Commit: bba23bf70bcd7e4d5a972806a9df62669e7cda81
Parents: 7b35bb3
Author: Guillaume Nodet <gn...@apache.org>
Authored: Wed Jul 25 14:38:05 2018 +0200
Committer: Guillaume Nodet <gn...@apache.org>
Committed: Wed Jul 25 14:38:05 2018 +0200

----------------------------------------------------------------------
 .../agent/local/ChannelAgentForwarding.java     |  9 ++-
 .../sshd/agent/unix/AgentForwardedChannel.java  |  9 ++-
 .../sshd/agent/unix/ChannelAgentForwarding.java |  9 ++-
 .../client/channel/AbstractClientChannel.java   |  2 +-
 .../sshd/client/channel/ChannelSession.java     | 12 ++-
 .../sshd/common/channel/AbstractChannel.java    | 43 ++++------
 .../sshd/common/io/nio2/Nio2Acceptor.java       | 16 ++--
 .../util/closeable/AbstractCloseable.java       |  6 +-
 .../util/closeable/AbstractInnerCloseable.java  |  4 +-
 .../sshd/server/channel/ChannelSession.java     |  8 +-
 .../sshd/server/forward/TcpipServerChannel.java | 84 ++++++++++----------
 11 files changed, 104 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
index e051f1e..0ca735b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/local/ChannelAgentForwarding.java
@@ -28,10 +28,10 @@ import org.apache.sshd.agent.SshAgentFactory;
 import org.apache.sshd.agent.common.AbstractAgentClient;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.session.Session;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -110,8 +110,11 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
-        return super.close(immediately).addListener(sshFuture -> closeImmediately0());
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .close(super.getInnerCloseable())
+                .run(toString(), this::closeImmediately0)
+                .build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
index 1b02333..1c7b95b 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/AgentForwardedChannel.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 
 import org.apache.sshd.client.channel.AbstractClientChannel;
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
 import org.apache.sshd.common.channel.Window;
@@ -68,9 +69,11 @@ public class AgentForwardedChannel extends AbstractClientChannel implements Runn
     }
 
     @Override
-    protected synchronized void doCloseImmediately() {
-        Socket.close(socket);
-        super.doCloseImmediately();
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .close(super.getInnerCloseable())
+                .run(toString(), () -> Socket.close(socket))
+                .build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
index 586faae..4a6a5ce 100644
--- a/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
+++ b/sshd-core/src/main/java/org/apache/sshd/agent/unix/ChannelAgentForwarding.java
@@ -27,9 +27,9 @@ import java.util.concurrent.Future;
 import org.apache.sshd.agent.SshAgent;
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelOutputStream;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.util.GenericUtils;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
@@ -149,8 +149,11 @@ public class ChannelAgentForwarding extends AbstractServerChannel {
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
-        return super.close(immediately).addListener(sshFuture -> closeImmediately0());
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .close(super.getInnerCloseable())
+                .run(toString(), this::closeImmediately0)
+                .build();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
index 1ed4518..c61e271 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/AbstractClientChannel.java
@@ -204,7 +204,7 @@ public abstract class AbstractClientChannel extends AbstractChannel implements C
                     IoUtils.closeQuietly(invertedIn, invertedOut, invertedErr);
                 })
                 .parallel(asyncIn, asyncOut, asyncErr)
-                .close(new GracefulChannelCloseable())
+                .close(super.getInnerCloseable())
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
index b3f8c67..1b3ff6f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/client/channel/ChannelSession.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.concurrent.Future;
 
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.SshConstants;
 import org.apache.sshd.common.channel.ChannelAsyncInputStream;
 import org.apache.sshd.common.channel.ChannelAsyncOutputStream;
@@ -125,7 +126,14 @@ public class ChannelSession extends AbstractClientChannel {
     }
 
     @Override
-    protected void doCloseImmediately() {
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .close(super.getInnerCloseable())
+                .run(toString(), this::closeImmediately0)
+                .build();
+    }
+
+    protected void closeImmediately0() {
         if ((pumper != null) && (pumperService != null) && (!pumperService.isShutdown())) {
             try {
                 if (!pumper.isDone()) {
@@ -147,8 +155,6 @@ public class ChannelSession extends AbstractClientChannel {
                 pumperService = null;
             }
         }
-
-        super.doCloseImmediately();
     }
 
     protected void pumpInputStream() {

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
index 6d3c78e..b1f018e 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/channel/AbstractChannel.java
@@ -537,7 +537,7 @@ public abstract class AbstractChannel
             log.debug("handleClose({}) SSH_MSG_CHANNEL_CLOSE", this);
         }
 
-        if (!eofSent.getAndSet(true)) {
+        if (!isEofSent()) {
             if (debugEnabled) {
                 log.debug("handleClose({}) prevent sending EOF", this);
             }
@@ -551,19 +551,15 @@ public abstract class AbstractChannel
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
-        if (!eofSent.getAndSet(true)) {
-            if (log.isDebugEnabled()) {
-                log.debug("close({}) prevent sending EOF", this);
-            }
-        }
-
-        return super.close(immediately);
-    }
-
-    @Override
     protected Closeable getInnerCloseable() {
-        return new GracefulChannelCloseable();
+        return builder()
+                .sequential(new GracefulChannelCloseable(), getExecutorService())
+                .run(toString(), () -> {
+                    if (service != null) {
+                        service.unregisterChannel(AbstractChannel.this);
+                    }
+                })
+                .build();
     }
 
     public class GracefulChannelCloseable extends IoBaseCloseable {
@@ -684,6 +680,10 @@ public abstract class AbstractChannel
 
     @Override
     protected void preClose() {
+        if (!isEofSent()) {
+            log.debug("close({}) prevent sending EOF", this);
+        }
+
         try {
             signalChannelClosed(null);
         } finally {
@@ -768,15 +768,6 @@ public abstract class AbstractChannel
     }
 
     @Override
-    protected void doCloseImmediately() {
-        if (service != null) {
-            service.unregisterChannel(AbstractChannel.this);
-        }
-
-        super.doCloseImmediately();
-    }
-
-    @Override
     public IoWriteFuture writePacket(Buffer buffer) throws IOException {
         Session s = getSession();
         if (!isClosing()) {
@@ -920,16 +911,16 @@ public abstract class AbstractChannel
     protected abstract void doWriteExtendedData(byte[] data, int off, long len) throws IOException;
 
     protected void sendEof() throws IOException {
-        if (eofSent.getAndSet(true)) {
+        if (isClosing()) {
             if (log.isDebugEnabled()) {
-                log.debug("sendEof({}) already sent", this);
+                log.debug("sendEof({}) already closing or closed", this);
             }
             return;
         }
 
-        if (isClosing()) {
+        if (eofSent.getAndSet(true)) {
             if (log.isDebugEnabled()) {
-                log.debug("sendEof({}) already closing or closed", this);
+                log.debug("sendEof({}) already sent", this);
             }
             return;
         }

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
index 281d540..73a3c5f 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/io/nio2/Nio2Acceptor.java
@@ -33,8 +33,8 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
-import org.apache.sshd.common.future.CloseFuture;
 import org.apache.sshd.common.io.IoAcceptor;
 import org.apache.sshd.common.io.IoHandler;
 import org.apache.sshd.common.util.ValidateUtils;
@@ -137,13 +137,20 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
+    protected void preClose() {
         unbind();
-        return super.close(immediately);
+        super.preClose();
     }
 
     @Override
-    public void doCloseImmediately() {
+    protected Closeable getInnerCloseable() {
+        return builder()
+            .close(super.getInnerCloseable())
+            .run(toString(), this::closeImmediately0)
+            .build();
+    }
+
+    protected void closeImmediately0() {
         Collection<SocketAddress> boundAddresses = getBoundAddresses();
         boolean debugEnabled = log.isDebugEnabled();
         for (SocketAddress address : boundAddresses) {
@@ -163,7 +170,6 @@ public class Nio2Acceptor extends Nio2Service implements IoAcceptor {
                 }
             }
         }
-        super.doCloseImmediately();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
index dffbd5b..6413ebb 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractCloseable.java
@@ -71,7 +71,7 @@ public abstract class AbstractCloseable extends IoBaseCloseable {
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
+    public final CloseFuture close(boolean immediately) {
         boolean debugEnabled = log.isDebugEnabled();
         if (immediately) {
             if (state.compareAndSet(State.Opened, State.Immediate)
@@ -123,12 +123,12 @@ public abstract class AbstractCloseable extends IoBaseCloseable {
     }
 
     @Override
-    public boolean isClosed() {
+    public final boolean isClosed() {
         return state.get() == State.Closed;
     }
 
     @Override
-    public boolean isClosing() {
+    public final boolean isClosing() {
         return state.get() != State.Opened;
     }
 

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java
index 853033f..6518d23 100644
--- a/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java
+++ b/sshd-core/src/main/java/org/apache/sshd/common/util/closeable/AbstractInnerCloseable.java
@@ -36,13 +36,13 @@ public abstract class AbstractInnerCloseable extends AbstractCloseable {
     protected abstract Closeable getInnerCloseable();
 
     @Override
-    protected CloseFuture doCloseGracefully() {
+    protected final CloseFuture doCloseGracefully() {
         return getInnerCloseable().close(false);
     }
 
     @Override
     @SuppressWarnings("synthetic-access")
-    protected void doCloseImmediately() {
+    protected final void doCloseImmediately() {
         getInnerCloseable().close(true).addListener(future -> AbstractInnerCloseable.super.doCloseImmediately());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
index 78f5861..16e16b7 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
@@ -123,8 +123,9 @@ public class ChannelSession extends AbstractServerChannel {
     @Override
     protected Closeable getInnerCloseable() {
         return builder()
-                .sequential(new CommandCloseable(), new GracefulChannelCloseable())
+                .sequential(new CommandCloseable(), super.getInnerCloseable())
                 .parallel(asyncOut, asyncErr)
+                .run(toString(), this::closeImmediately0)
                 .build();
     }
 
@@ -190,8 +191,7 @@ public class ChannelSession extends AbstractServerChannel {
         }
     }
 
-    @Override
-    protected void doCloseImmediately() {
+    protected void closeImmediately0() {
         boolean debugEnabled = log.isDebugEnabled();
         if (commandInstance != null) {
             try {
@@ -223,8 +223,6 @@ public class ChannelSession extends AbstractServerChannel {
                 }
             }
         }
-
-        super.doCloseImmediately();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/mina-sshd/blob/bba23bf7/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
----------------------------------------------------------------------
diff --git a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
index 663553b..e7eef13 100644
--- a/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
+++ b/sshd-core/src/main/java/org/apache/sshd/server/forward/TcpipServerChannel.java
@@ -27,6 +27,7 @@ import java.util.Objects;
 
 import org.apache.sshd.client.future.DefaultOpenFuture;
 import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.Closeable;
 import org.apache.sshd.common.FactoryManager;
 import org.apache.sshd.common.RuntimeSshException;
 import org.apache.sshd.common.SshConstants;
@@ -48,6 +49,7 @@ import org.apache.sshd.common.util.Readable;
 import org.apache.sshd.common.util.ValidateUtils;
 import org.apache.sshd.common.util.buffer.Buffer;
 import org.apache.sshd.common.util.buffer.ByteArrayBuffer;
+import org.apache.sshd.common.util.closeable.AbstractCloseable;
 import org.apache.sshd.common.util.net.SshdSocketAddress;
 import org.apache.sshd.common.util.threads.ExecutorService;
 import org.apache.sshd.common.util.threads.ExecutorServiceCarrier;
@@ -292,51 +294,45 @@ public class TcpipServerChannel extends AbstractServerChannel implements Forward
     }
 
     @Override
-    public CloseFuture close(boolean immediately) {
-        boolean debugEnabled = log.isDebugEnabled();
-        /*
-         * In case of graceful shutdown (e.g. when the remote channel is gently closed)
-         * we also need to close the ChannelOutputStream which flushes remaining buffer
-         * and sends SSH_MSG_CHANNEL_EOF back to the client.
-         */
-        if ((!immediately) && (out != null)) {
-            try {
-                if (debugEnabled) {
-                    log.debug("Closing channel output stream of {}", this);
-                }
-
-                out.close();
-            } catch (IOException | RuntimeException ignored) {
-                if (debugEnabled) {
-                    log.debug("{} while closing channel output stream of {}: {}",
-                        ignored.getClass().getSimpleName(), this, ignored.getMessage());
-                }
-            }
-        }
-
-        CloseFuture closingFeature = super.close(immediately);
-
-        // We also need to dispose of the connector, but unfortunately we
-        // are being invoked by the connector thread or the connector's
-        // own processor thread. Disposing of the connector within either
-        // causes deadlock. Instead create a thread to dispose of the
-        // connector in the background.
-        ExecutorService service = getExecutorService();
-
-        // allocate a temporary executor service if none provided
-        ExecutorService executors = (service == null)
-                ? ThreadUtils.newSingleThreadExecutor("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]")
-                : ThreadUtils.noClose(service);
+    protected Closeable getInnerCloseable() {
+        return builder()
+                .run(toString(), () -> {
+                    /*
+                     * In case of graceful shutdown (e.g. when the remote channel is gently closed)
+                     * we also need to close the ChannelOutputStream which flushes remaining buffer
+                     * and sends SSH_MSG_CHANNEL_EOF back to the client.
+                     */
+                    if (out != null) {
+                        try {
+                            log.debug("Closing channel output stream of {}", this);
+                            out.close();
+                        } catch (IOException | RuntimeException ignored) {
+                            log.debug("{} while closing channel output stream of {}: {}",
+                                    ignored.getClass().getSimpleName(), this, ignored.getMessage(), ignored);
+                        }
+                    }
+                })
+                .close(super.getInnerCloseable())
+                .close(new AbstractCloseable() {
+                    ExecutorService executor = ThreadUtils.newCachedThreadPool("TcpIpServerChannel-ConnectorCleanup[" + getSession() + "]");
+                    @Override
+                    protected CloseFuture doCloseGracefully() {
+                        executor.submit(() -> {
+                            connector.close(false);
+                        });
+                        return null;
+                    }
 
-        return builder().when(closingFeature).run(toString(), () -> {
-            executors.submit(() -> {
-                if (debugEnabled) {
-                    log.debug("disposing connector: {} for: {}", connector, TcpipServerChannel.this);
-                }
-                connector.close(immediately)
-                        .addListener(f -> executors.close(true));
-            });
-        }).build().close(false);
+                    @Override
+                    protected void doCloseImmediately() {
+                        executor.submit(() -> {
+                            connector.close(true)
+                                    .addListener(f -> executor.close(true));
+                        });
+                        super.doCloseImmediately();
+                    }
+                })
+                .build();
     }
 
     @Override