You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/11/16 17:28:40 UTC

camel git commit: CAMEL-9319 SshClient resource leak when used from ProducerTemplate

Repository: camel
Updated Branches:
  refs/heads/master e1758f39a -> d2770c28a


CAMEL-9319 SshClient resource leak when used from ProducerTemplate


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2770c28
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2770c28
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2770c28

Branch: refs/heads/master
Commit: d2770c28ae7de0af6398530761f07e0f155f1d28
Parents: e1758f3
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Nov 16 17:09:04 2015 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Nov 16 17:09:05 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/component/ssh/SshConsumer.java |  23 +++-
 .../apache/camel/component/ssh/SshEndpoint.java | 105 +--------------
 .../apache/camel/component/ssh/SshHelper.java   | 127 +++++++++++++++++++
 .../apache/camel/component/ssh/SshProducer.java |  23 +++-
 4 files changed, 172 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
index 330972e..682268a 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
@@ -19,19 +19,40 @@ package org.apache.camel.component.ssh;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.sshd.SshClient;
 
 public class SshConsumer extends ScheduledPollConsumer {
     private final SshEndpoint endpoint;
+    
+    private SshClient client;
 
     public SshConsumer(SshEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
+    
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        client = SshClient.setUpDefaultClient();
+        client.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (client != null) {
+            client.stop();
+            client = null;
+        }
+
+        super.doStop();
+    }
 
     @Override
     protected int poll() throws Exception {
         String command = endpoint.getPollCommand();
-        SshResult result = endpoint.sendExecCommand(command);
+        SshResult result = SshHelper.sendExecCommand(command, endpoint, client);
 
         Exchange exchange = endpoint.createExchange();
         exchange.getIn().setBody(result.getStdout());

http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
index 8235e1b..af709f4 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 public class SshEndpoint extends ScheduledPollEndpoint {
     protected final Logger log = LoggerFactory.getLogger(getClass());
 
-    private SshClient client;
     @UriParam
     private SshConfiguration sshConfiguration;
 
@@ -75,109 +74,7 @@ public class SshEndpoint extends ScheduledPollEndpoint {
     @Override
     public boolean isSingleton() {
         // SshClient is not thread-safe to be shared
-        return false;
-    }
-
-    public SshResult sendExecCommand(String command) throws Exception {
-        SshResult result = null;
-
-        if (getConfiguration() == null) {
-            throw new IllegalStateException("Configuration must be set");
-        }
-
-        ConnectFuture connectFuture = client.connect(null, getHost(), getPort());
-
-        // Wait getTimeout milliseconds for connect operation to complete
-        connectFuture.await(getTimeout());
-
-        if (!connectFuture.isDone() || !connectFuture.isConnected()) {
-            final String msg = "Failed to connect to " + getHost() + ":" + getPort() + " within timeout " + getTimeout() + "ms";
-            log.debug(msg);
-            throw new RuntimeCamelException(msg);
-        }
-
-        log.debug("Connected to {}:{}", getHost(), getPort());
-
-        ClientChannel channel = null;
-        ClientSession session = null;
-        
-        try {
-            AuthFuture authResult;
-            session = connectFuture.getSession();
-    
-            KeyPairProvider keyPairProvider;
-            final String certResource = getCertResource();
-            if (certResource != null) {
-                log.debug("Attempting to authenticate using ResourceKey '{}'...", certResource);
-                keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, getCamelContext().getClassResolver());
-            } else {
-                keyPairProvider = getKeyPairProvider();
-            }
-    
-            if (keyPairProvider != null) {
-                log.debug("Attempting to authenticate username '{}' using Key...", getUsername());
-                KeyPair pair = keyPairProvider.loadKey(getKeyType());
-                authResult = session.authPublicKey(getUsername(), pair);
-            } else {
-                log.debug("Attempting to authenticate username '{}' using Password...", getUsername());
-                authResult = session.authPassword(getUsername(), getPassword());
-            }
-    
-            authResult.await(getTimeout());
-    
-            if (!authResult.isDone() || authResult.isFailure()) {
-                log.debug("Failed to authenticate");
-                throw new RuntimeCamelException("Failed to authenticate username " + getUsername());
-            }
-        
-            channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command);
-
-            ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0});
-            channel.setIn(in);
-    
-            ByteArrayOutputStream out = new ByteArrayOutputStream();
-            channel.setOut(out);
-    
-            ByteArrayOutputStream err = new ByteArrayOutputStream();
-            channel.setErr(err);
-            OpenFuture openFuture = channel.open();
-            openFuture.await(getTimeout());
-            if (openFuture.isOpened()) {
-                channel.waitFor(ClientChannel.CLOSED, 0);
-                result = new SshResult(command, channel.getExitStatus(),
-                        new ByteArrayInputStream(out.toByteArray()),
-                        new ByteArrayInputStream(err.toByteArray()));
-    
-            }
-            return result;
-        } finally {
-            if (channel != null) {
-                channel.close(true);
-            }
-            // need to make sure the session is closed 
-            if (session != null) {
-                session.close(false);
-            }
-        }
-        
-    }
-
-    @Override
-    protected void doStart() throws Exception {
-        super.doStart();
-
-        client = SshClient.setUpDefaultClient();
-        client.start();
-    }
-
-    @Override
-    protected void doStop() throws Exception {
-        if (client != null) {
-            client.stop();
-            client = null;
-        }
-
-        super.doStop();
+        return true;
     }
 
     public SshConfiguration getConfiguration() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
new file mode 100644
index 0000000..6c9ea02
--- /dev/null
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ssh;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.security.KeyPair;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.sshd.ClientChannel;
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.SshClient;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.KeyPairProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SshHelper {
+    
+    protected static final Logger LOG = LoggerFactory.getLogger(SshHelper.class);
+    
+    private SshHelper() {
+    }
+    
+    public static SshResult sendExecCommand(String command, SshEndpoint endpoint, SshClient client) throws Exception {
+        SshResult result = null;
+        
+        SshConfiguration configuration = endpoint.getConfiguration();
+
+        if (configuration == null) {
+            throw new IllegalStateException("Configuration must be set");
+        }
+
+        ConnectFuture connectFuture = client.connect(null, configuration.getHost(), configuration.getPort());
+
+        // Wait getTimeout milliseconds for connect operation to complete
+        connectFuture.await(configuration.getTimeout());
+
+        if (!connectFuture.isDone() || !connectFuture.isConnected()) {
+            final String msg = "Failed to connect to " + configuration.getHost() + ":" + configuration.getPort() + " within timeout " + configuration.getTimeout() + "ms";
+            LOG.debug(msg);
+            throw new RuntimeCamelException(msg);
+        }
+
+        LOG.debug("Connected to {}:{}", configuration.getHost(), configuration.getPort());
+
+        ClientChannel channel = null;
+        ClientSession session = null;
+        
+        try {
+            AuthFuture authResult;
+            session = connectFuture.getSession();
+    
+            KeyPairProvider keyPairProvider;
+            final String certResource = configuration.getCertResource();
+            if (certResource != null) {
+                LOG.debug("Attempting to authenticate using ResourceKey '{}'...", certResource);
+                keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, endpoint.getCamelContext().getClassResolver());
+            } else {
+                keyPairProvider = configuration.getKeyPairProvider();
+            }
+    
+            if (keyPairProvider != null) {
+                LOG.debug("Attempting to authenticate username '{}' using Key...", configuration.getUsername());
+                KeyPair pair = keyPairProvider.loadKey(configuration.getKeyType());
+                authResult = session.authPublicKey(configuration.getUsername(), pair);
+            } else {
+                LOG.debug("Attempting to authenticate username '{}' using Password...", configuration.getUsername());
+                authResult = session.authPassword(configuration.getUsername(), configuration.getPassword());
+            }
+    
+            authResult.await(configuration.getTimeout());
+    
+            if (!authResult.isDone() || authResult.isFailure()) {
+                LOG.debug("Failed to authenticate");
+                throw new RuntimeCamelException("Failed to authenticate username " + configuration.getUsername());
+            }
+        
+            channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command);
+
+            ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0});
+            channel.setIn(in);
+    
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            channel.setOut(out);
+    
+            ByteArrayOutputStream err = new ByteArrayOutputStream();
+            channel.setErr(err);
+            OpenFuture openFuture = channel.open();
+            openFuture.await(configuration.getTimeout());
+            if (openFuture.isOpened()) {
+                channel.waitFor(ClientChannel.CLOSED, 0);
+                result = new SshResult(command, channel.getExitStatus(),
+                        new ByteArrayInputStream(out.toByteArray()),
+                        new ByteArrayInputStream(err.toByteArray()));
+    
+            }
+            return result;
+        } finally {
+            if (channel != null) {
+                channel.close(true);
+            }
+            // need to make sure the session is closed 
+            if (session != null) {
+                session.close(false);
+            }
+        }
+        
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
index f29c854..a2867cb 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
@@ -20,14 +20,35 @@ import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.impl.DefaultProducer;
+import org.apache.sshd.SshClient;
 
 public class SshProducer extends DefaultProducer {
     private SshEndpoint endpoint;
+    
+    private SshClient client;
 
     public SshProducer(SshEndpoint endpoint) {
         super(endpoint);
         this.endpoint = endpoint;
     }
+    
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        client = SshClient.setUpDefaultClient();
+        client.start();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (client != null) {
+            client.stop();
+            client = null;
+        }
+
+        super.doStop();
+    }
 
     @Override
     public void process(Exchange exchange) throws Exception {
@@ -35,7 +56,7 @@ public class SshProducer extends DefaultProducer {
         String command = in.getMandatoryBody(String.class);
 
         try {
-            SshResult result = endpoint.sendExecCommand(command);
+            SshResult result = SshHelper.sendExecCommand(command, endpoint, client);
             exchange.getOut().setBody(result.getStdout());
             exchange.getOut().setHeader(SshResult.EXIT_VALUE, result.getExitValue());
             exchange.getOut().setHeader(SshResult.STDERR, result.getStderr());