You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/02 17:03:18 UTC
[camel] branch master updated: Slack: Re-use http client and
consume response (#4990)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new dbd50cf Slack: Re-use http client and consume response (#4990)
dbd50cf is described below
commit dbd50cfb14ca91901b92d9e1c6e13957a8c7d7cc
Author: perttuk <pe...@users.noreply.github.com>
AuthorDate: Tue Feb 2 19:02:39 2021 +0200
Slack: Re-use http client and consume response (#4990)
* Re-use http client and consume response
* Re-use http client
* Close http client also
* Convert to async producer
* Consume response
---
components/camel-slack/pom.xml | 4 ++
.../camel/component/slack/SlackConsumer.java | 14 +++--
.../camel/component/slack/SlackProducer.java | 60 +++++++++++++++++-----
3 files changed, 62 insertions(+), 16 deletions(-)
diff --git a/components/camel-slack/pom.xml b/components/camel-slack/pom.xml
index 9fe6eaa..f40ed28 100644
--- a/components/camel-slack/pom.xml
+++ b/components/camel-slack/pom.xml
@@ -62,6 +62,10 @@
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpasyncclient</artifactId>
+ </dependency>
<!-- test dependencies -->
<dependency>
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
index 2550559..396e66e 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -35,9 +35,9 @@ import org.apache.camel.util.json.JsonArray;
import org.apache.camel.util.json.JsonObject;
import org.apache.camel.util.json.Jsoner;
import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
@@ -48,6 +48,7 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
private SlackEndpoint slackEndpoint;
private String timestamp;
private String channelId;
+ private CloseableHttpClient client;
public SlackConsumer(SlackEndpoint endpoint, Processor processor) throws IOException, DeserializationException {
super(endpoint, processor);
@@ -56,15 +57,23 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
@Override
protected void doStart() throws Exception {
+ this.client = HttpClientBuilder.create().useSystemProperties().build();
super.doStart();
this.channelId = getChannelId(slackEndpoint.getChannel());
}
@Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
protected int poll() throws Exception {
Queue<Exchange> exchanges;
- HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
HttpPost httpPost = new HttpPost(slackEndpoint.getServerUrl() + "/api/conversations.history");
List<BasicNameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair(SlackConstants.SLACK_CHANNEL_FIELD, channelId));
@@ -131,7 +140,6 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
}
private String getChannelId(String channel) throws IOException, DeserializationException {
- HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
HttpPost httpPost = new HttpPost(slackEndpoint.getServerUrl() + "/api/conversations.list");
List<BasicNameValuePair> params = new ArrayList<>();
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
index 43f0637..cac89c6 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
@@ -21,21 +21,26 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.component.slack.helper.SlackMessage;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.util.json.JsonObject;
import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.util.EntityUtils;
-public class SlackProducer extends DefaultProducer {
+public class SlackProducer extends DefaultAsyncProducer {
- private SlackEndpoint slackEndpoint;
+ private final SlackEndpoint slackEndpoint;
+
+ private CloseableHttpAsyncClient client;
public SlackProducer(SlackEndpoint endpoint) {
super(endpoint);
@@ -43,10 +48,23 @@ public class SlackProducer extends DefaultProducer {
}
@Override
- public void process(Exchange exchange) throws Exception {
+ protected void doStart() throws Exception {
+ this.client = HttpAsyncClientBuilder.create().useSystemProperties().build();
+ super.doStart();
+ }
- // Create an HttpClient and Post object
- HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
+ @Override
+ protected void doStop() throws Exception {
+ super.doStop();
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+
+ // Create Post object
HttpPost httpPost = new HttpPost(slackEndpoint.getWebhookUrl());
// Build Helper object
@@ -73,12 +91,28 @@ public class SlackProducer extends DefaultProducer {
// Do the post
httpPost.setEntity(body);
- HttpResponse response = client.execute(httpPost);
+ client.execute(httpPost, new FutureCallback<HttpResponse>() {
+ @Override
+ public void completed(HttpResponse response) {
+ // 2xx is OK, anything else we regard as failure
+ if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() > 299) {
+ exchange.setException(new CamelExchangeException("Error POSTing to Slack API: " + response.toString(), exchange));
+ }
+ EntityUtils.consumeQuietly(response.getEntity());
+ callback.done(false);
+ }
+ @Override
+ public void failed(Exception ex) {
+ exchange.setException(ex);
+ callback.done(false);
+ }
+ @Override
+ public void cancelled() {
+ callback.done(false);
+ }
+ });
- // 2xx is OK, anything else we regard as failure
- if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() > 299) {
- throw new CamelExchangeException("Error POSTing to Slack API: " + response.toString(), exchange);
- }
+ return false;
}
/**