You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2015/11/16 17:28:40 UTC
camel git commit: CAMEL-9319 SshClient resource leak when used from
ProducerTemplate
Repository: camel
Updated Branches:
refs/heads/master e1758f39a -> d2770c28a
CAMEL-9319 SshClient resource leak when used from ProducerTemplate
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d2770c28
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d2770c28
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d2770c28
Branch: refs/heads/master
Commit: d2770c28ae7de0af6398530761f07e0f155f1d28
Parents: e1758f3
Author: Andrea Cosentino <an...@gmail.com>
Authored: Mon Nov 16 17:09:04 2015 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Nov 16 17:09:05 2015 +0100
----------------------------------------------------------------------
.../apache/camel/component/ssh/SshConsumer.java | 23 +++-
.../apache/camel/component/ssh/SshEndpoint.java | 105 +--------------
.../apache/camel/component/ssh/SshHelper.java | 127 +++++++++++++++++++
.../apache/camel/component/ssh/SshProducer.java | 23 +++-
4 files changed, 172 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
index 330972e..682268a 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshConsumer.java
@@ -19,19 +19,40 @@ package org.apache.camel.component.ssh;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
+import org.apache.sshd.SshClient;
public class SshConsumer extends ScheduledPollConsumer {
private final SshEndpoint endpoint;
+
+ private SshClient client;
public SshConsumer(SshEndpoint endpoint, Processor processor) {
super(endpoint, processor);
this.endpoint = endpoint;
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ client = SshClient.setUpDefaultClient();
+ client.start();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (client != null) {
+ client.stop();
+ client = null;
+ }
+
+ super.doStop();
+ }
@Override
protected int poll() throws Exception {
String command = endpoint.getPollCommand();
- SshResult result = endpoint.sendExecCommand(command);
+ SshResult result = SshHelper.sendExecCommand(command, endpoint, client);
Exchange exchange = endpoint.createExchange();
exchange.getIn().setBody(result.getStdout());
http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
index 8235e1b..af709f4 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshEndpoint.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
public class SshEndpoint extends ScheduledPollEndpoint {
protected final Logger log = LoggerFactory.getLogger(getClass());
- private SshClient client;
@UriParam
private SshConfiguration sshConfiguration;
@@ -75,109 +74,7 @@ public class SshEndpoint extends ScheduledPollEndpoint {
@Override
public boolean isSingleton() {
// SshClient is not thread-safe to be shared
- return false;
- }
-
- public SshResult sendExecCommand(String command) throws Exception {
- SshResult result = null;
-
- if (getConfiguration() == null) {
- throw new IllegalStateException("Configuration must be set");
- }
-
- ConnectFuture connectFuture = client.connect(null, getHost(), getPort());
-
- // Wait getTimeout milliseconds for connect operation to complete
- connectFuture.await(getTimeout());
-
- if (!connectFuture.isDone() || !connectFuture.isConnected()) {
- final String msg = "Failed to connect to " + getHost() + ":" + getPort() + " within timeout " + getTimeout() + "ms";
- log.debug(msg);
- throw new RuntimeCamelException(msg);
- }
-
- log.debug("Connected to {}:{}", getHost(), getPort());
-
- ClientChannel channel = null;
- ClientSession session = null;
-
- try {
- AuthFuture authResult;
- session = connectFuture.getSession();
-
- KeyPairProvider keyPairProvider;
- final String certResource = getCertResource();
- if (certResource != null) {
- log.debug("Attempting to authenticate using ResourceKey '{}'...", certResource);
- keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, getCamelContext().getClassResolver());
- } else {
- keyPairProvider = getKeyPairProvider();
- }
-
- if (keyPairProvider != null) {
- log.debug("Attempting to authenticate username '{}' using Key...", getUsername());
- KeyPair pair = keyPairProvider.loadKey(getKeyType());
- authResult = session.authPublicKey(getUsername(), pair);
- } else {
- log.debug("Attempting to authenticate username '{}' using Password...", getUsername());
- authResult = session.authPassword(getUsername(), getPassword());
- }
-
- authResult.await(getTimeout());
-
- if (!authResult.isDone() || authResult.isFailure()) {
- log.debug("Failed to authenticate");
- throw new RuntimeCamelException("Failed to authenticate username " + getUsername());
- }
-
- channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command);
-
- ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0});
- channel.setIn(in);
-
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- channel.setOut(out);
-
- ByteArrayOutputStream err = new ByteArrayOutputStream();
- channel.setErr(err);
- OpenFuture openFuture = channel.open();
- openFuture.await(getTimeout());
- if (openFuture.isOpened()) {
- channel.waitFor(ClientChannel.CLOSED, 0);
- result = new SshResult(command, channel.getExitStatus(),
- new ByteArrayInputStream(out.toByteArray()),
- new ByteArrayInputStream(err.toByteArray()));
-
- }
- return result;
- } finally {
- if (channel != null) {
- channel.close(true);
- }
- // need to make sure the session is closed
- if (session != null) {
- session.close(false);
- }
- }
-
- }
-
- @Override
- protected void doStart() throws Exception {
- super.doStart();
-
- client = SshClient.setUpDefaultClient();
- client.start();
- }
-
- @Override
- protected void doStop() throws Exception {
- if (client != null) {
- client.stop();
- client = null;
- }
-
- super.doStop();
+ return true;
}
public SshConfiguration getConfiguration() {
http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
new file mode 100644
index 0000000..6c9ea02
--- /dev/null
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshHelper.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.ssh;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.security.KeyPair;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.sshd.ClientChannel;
+import org.apache.sshd.ClientSession;
+import org.apache.sshd.SshClient;
+import org.apache.sshd.client.future.AuthFuture;
+import org.apache.sshd.client.future.ConnectFuture;
+import org.apache.sshd.client.future.OpenFuture;
+import org.apache.sshd.common.KeyPairProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class SshHelper {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(SshHelper.class);
+
+ private SshHelper() {
+ }
+
+ public static SshResult sendExecCommand(String command, SshEndpoint endpoint, SshClient client) throws Exception {
+ SshResult result = null;
+
+ SshConfiguration configuration = endpoint.getConfiguration();
+
+ if (configuration == null) {
+ throw new IllegalStateException("Configuration must be set");
+ }
+
+ ConnectFuture connectFuture = client.connect(null, configuration.getHost(), configuration.getPort());
+
+ // Wait getTimeout milliseconds for connect operation to complete
+ connectFuture.await(configuration.getTimeout());
+
+ if (!connectFuture.isDone() || !connectFuture.isConnected()) {
+ final String msg = "Failed to connect to " + configuration.getHost() + ":" + configuration.getPort() + " within timeout " + configuration.getTimeout() + "ms";
+ LOG.debug(msg);
+ throw new RuntimeCamelException(msg);
+ }
+
+ LOG.debug("Connected to {}:{}", configuration.getHost(), configuration.getPort());
+
+ ClientChannel channel = null;
+ ClientSession session = null;
+
+ try {
+ AuthFuture authResult;
+ session = connectFuture.getSession();
+
+ KeyPairProvider keyPairProvider;
+ final String certResource = configuration.getCertResource();
+ if (certResource != null) {
+ LOG.debug("Attempting to authenticate using ResourceKey '{}'...", certResource);
+ keyPairProvider = new ResourceHelperKeyPairProvider(new String[]{certResource}, endpoint.getCamelContext().getClassResolver());
+ } else {
+ keyPairProvider = configuration.getKeyPairProvider();
+ }
+
+ if (keyPairProvider != null) {
+ LOG.debug("Attempting to authenticate username '{}' using Key...", configuration.getUsername());
+ KeyPair pair = keyPairProvider.loadKey(configuration.getKeyType());
+ authResult = session.authPublicKey(configuration.getUsername(), pair);
+ } else {
+ LOG.debug("Attempting to authenticate username '{}' using Password...", configuration.getUsername());
+ authResult = session.authPassword(configuration.getUsername(), configuration.getPassword());
+ }
+
+ authResult.await(configuration.getTimeout());
+
+ if (!authResult.isDone() || authResult.isFailure()) {
+ LOG.debug("Failed to authenticate");
+ throw new RuntimeCamelException("Failed to authenticate username " + configuration.getUsername());
+ }
+
+ channel = session.createChannel(ClientChannel.CHANNEL_EXEC, command);
+
+ ByteArrayInputStream in = new ByteArrayInputStream(new byte[]{0});
+ channel.setIn(in);
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ channel.setOut(out);
+
+ ByteArrayOutputStream err = new ByteArrayOutputStream();
+ channel.setErr(err);
+ OpenFuture openFuture = channel.open();
+ openFuture.await(configuration.getTimeout());
+ if (openFuture.isOpened()) {
+ channel.waitFor(ClientChannel.CLOSED, 0);
+ result = new SshResult(command, channel.getExitStatus(),
+ new ByteArrayInputStream(out.toByteArray()),
+ new ByteArrayInputStream(err.toByteArray()));
+
+ }
+ return result;
+ } finally {
+ if (channel != null) {
+ channel.close(true);
+ }
+ // need to make sure the session is closed
+ if (session != null) {
+ session.close(false);
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d2770c28/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
index f29c854..a2867cb 100644
--- a/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
+++ b/components/camel-ssh/src/main/java/org/apache/camel/component/ssh/SshProducer.java
@@ -20,14 +20,35 @@ import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.impl.DefaultProducer;
+import org.apache.sshd.SshClient;
public class SshProducer extends DefaultProducer {
private SshEndpoint endpoint;
+
+ private SshClient client;
public SshProducer(SshEndpoint endpoint) {
super(endpoint);
this.endpoint = endpoint;
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ client = SshClient.setUpDefaultClient();
+ client.start();
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (client != null) {
+ client.stop();
+ client = null;
+ }
+
+ super.doStop();
+ }
@Override
public void process(Exchange exchange) throws Exception {
@@ -35,7 +56,7 @@ public class SshProducer extends DefaultProducer {
String command = in.getMandatoryBody(String.class);
try {
- SshResult result = endpoint.sendExecCommand(command);
+ SshResult result = SshHelper.sendExecCommand(command, endpoint, client);
exchange.getOut().setBody(result.getStdout());
exchange.getOut().setHeader(SshResult.EXIT_VALUE, result.getExitValue());
exchange.getOut().setHeader(SshResult.STDERR, result.getStderr());