You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/02 17:03:18 UTC

[camel] branch master updated: Slack: Re-use http client and consume response (#4990)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new dbd50cf  Slack: Re-use http client and consume response (#4990)
dbd50cf is described below

commit dbd50cfb14ca91901b92d9e1c6e13957a8c7d7cc
Author: perttuk <pe...@users.noreply.github.com>
AuthorDate: Tue Feb 2 19:02:39 2021 +0200

    Slack: Re-use http client and consume response (#4990)
    
    * Re-use http client and consume response
    
    * Re-use http client
    
    * Close http client also
    
    * Convert to async producer
    
    * Consume response
---
 components/camel-slack/pom.xml                     |  4 ++
 .../camel/component/slack/SlackConsumer.java       | 14 +++--
 .../camel/component/slack/SlackProducer.java       | 60 +++++++++++++++++-----
 3 files changed, 62 insertions(+), 16 deletions(-)

diff --git a/components/camel-slack/pom.xml b/components/camel-slack/pom.xml
index 9fe6eaa..f40ed28 100644
--- a/components/camel-slack/pom.xml
+++ b/components/camel-slack/pom.xml
@@ -62,6 +62,10 @@
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpasyncclient</artifactId>
+        </dependency>
 
         <!-- test dependencies -->
         <dependency>
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
index 2550559..396e66e 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -35,9 +35,9 @@ import org.apache.camel.util.json.JsonArray;
 import org.apache.camel.util.json.JsonObject;
 import org.apache.camel.util.json.Jsoner;
 import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
 import org.apache.http.client.entity.UrlEncodedFormEntity;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.message.BasicNameValuePair;
 
@@ -48,6 +48,7 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
     private SlackEndpoint slackEndpoint;
     private String timestamp;
     private String channelId;
+    private CloseableHttpClient client;
 
     public SlackConsumer(SlackEndpoint endpoint, Processor processor) throws IOException, DeserializationException {
         super(endpoint, processor);
@@ -56,15 +57,23 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     protected void doStart() throws Exception {
+        this.client = HttpClientBuilder.create().useSystemProperties().build();
         super.doStart();
         this.channelId = getChannelId(slackEndpoint.getChannel());
     }
 
     @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
     protected int poll() throws Exception {
         Queue<Exchange> exchanges;
 
-        HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
         HttpPost httpPost = new HttpPost(slackEndpoint.getServerUrl() + "/api/conversations.history");
         List<BasicNameValuePair> params = new ArrayList<>();
         params.add(new BasicNameValuePair(SlackConstants.SLACK_CHANNEL_FIELD, channelId));
@@ -131,7 +140,6 @@ public class SlackConsumer extends ScheduledBatchPollingConsumer {
     }
 
     private String getChannelId(String channel) throws IOException, DeserializationException {
-        HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
         HttpPost httpPost = new HttpPost(slackEndpoint.getServerUrl() + "/api/conversations.list");
 
         List<BasicNameValuePair> params = new ArrayList<>();
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
index 43f0637..cac89c6 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackProducer.java
@@ -21,21 +21,26 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.slack.helper.SlackMessage;
-import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.support.DefaultAsyncProducer;
 import org.apache.camel.support.ExchangeHelper;
 import org.apache.camel.util.json.JsonObject;
 import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.concurrent.FutureCallback;
 import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.apache.http.util.EntityUtils;
 
-public class SlackProducer extends DefaultProducer {
+public class SlackProducer extends DefaultAsyncProducer {
 
-    private SlackEndpoint slackEndpoint;
+    private final SlackEndpoint slackEndpoint;
+
+    private CloseableHttpAsyncClient client;
 
     public SlackProducer(SlackEndpoint endpoint) {
         super(endpoint);
@@ -43,10 +48,23 @@ public class SlackProducer extends DefaultProducer {
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
+    protected void doStart() throws Exception {
+        this.client = HttpAsyncClientBuilder.create().useSystemProperties().build();
+        super.doStart();
+    }
 
-        // Create an HttpClient and Post object
-        HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (client != null) {
+            client.close();
+        }
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+
+        // Create Post object
         HttpPost httpPost = new HttpPost(slackEndpoint.getWebhookUrl());
 
         // Build Helper object
@@ -73,12 +91,28 @@ public class SlackProducer extends DefaultProducer {
         // Do the post
         httpPost.setEntity(body);
 
-        HttpResponse response = client.execute(httpPost);
+        client.execute(httpPost, new FutureCallback<HttpResponse>() {
+            @Override
+            public void completed(HttpResponse response) {
+                // 2xx is OK, anything else we regard as failure
+                if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() > 299) {
+                    exchange.setException(new CamelExchangeException("Error POSTing to Slack API: " + response.toString(), exchange));
+                }
+                EntityUtils.consumeQuietly(response.getEntity());
+                callback.done(false);
+            }
+            @Override
+            public void failed(Exception ex) {
+                exchange.setException(ex);
+                callback.done(false);
+            }
+            @Override
+            public void cancelled() {
+                callback.done(false);
+            }
+        });
 
-        // 2xx is OK, anything else we regard as failure
-        if (response.getStatusLine().getStatusCode() < 200 || response.getStatusLine().getStatusCode() > 299) {
-            throw new CamelExchangeException("Error POSTing to Slack API: " + response.toString(), exchange);
-        }
+        return false;
     }
 
     /**