You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2018/08/22 08:45:37 UTC

[camel] branch master updated: CAMEL-12747 - Camel-Slack: Add consumer

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f313a7  CAMEL-12747 - Camel-Slack: Add consumer
4f313a7 is described below

commit 4f313a7753ed0a4c7f2e5210dc9e6552d2fd5696
Author: Andrea Cosentino <an...@gmail.com>
AuthorDate: Wed Aug 22 10:21:31 2018 +0200

    CAMEL-12747 - Camel-Slack: Add consumer
---
 components/camel-slack/pom.xml                     |   1 +
 .../camel-slack/src/main/docs/slack-component.adoc |  21 ++-
 .../camel/component/slack/SlackConsumer.java       | 173 +++++++++++++++++++++
 .../camel/component/slack/SlackEndpoint.java       |  84 ++++++++--
 .../camel/component/slack/SlackConsumerTest.java   |  46 ++++++
 5 files changed, 312 insertions(+), 13 deletions(-)

diff --git a/components/camel-slack/pom.xml b/components/camel-slack/pom.xml
index 497d14e..4adee8e 100644
--- a/components/camel-slack/pom.xml
+++ b/components/camel-slack/pom.xml
@@ -61,6 +61,7 @@
       <artifactId>camel-test-blueprint</artifactId>
       <scope>test</scope>
     </dependency>  
+    <!-- logging -->   
     <dependency>
       <groupId>org.apache.logging.log4j</groupId>
       <artifactId>log4j-api</artifactId>
diff --git a/components/camel-slack/src/main/docs/slack-component.adoc b/components/camel-slack/src/main/docs/slack-component.adoc
index e642819..d7ee10a 100644
--- a/components/camel-slack/src/main/docs/slack-component.adoc
+++ b/components/camel-slack/src/main/docs/slack-component.adoc
@@ -50,7 +50,7 @@ The Slack component supports 2 options, which are listed below.
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *webhookUrl* (producer) | The incoming webhook URL |  | String
+| *webhookUrl* (common) | The incoming webhook URL |  | String
 | *resolveProperty Placeholders* (advanced) | Whether the component should resolve property placeholders on itself when starting. Only properties which are of String type can use property placeholders. | true | boolean
 |===
 // component options: END
@@ -77,12 +77,17 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (5 parameters):
+==== Query Parameters (10 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *maxResults* (consumer) | The Max Result for the poll | 10 | String
+| *token* (consumer) | The token to use |  | String
+| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
 | *iconEmoji* (producer) | Use a Slack emoji as an avatar |  | String
 | *iconUrl* (producer) | The avatar that the component will use when sending message to a channel or user. |  | String
 | *username* (producer) | This is the username that the bot will have when sending messages to a channel or user. |  | String
@@ -147,6 +152,18 @@ A CamelContext with Blueprint could be as:
 </blueprint>
 ---------------------------------------------------------------------------------------------------------------------------
 
+### Consumer
+
+You can use also a consumer for messages in channel
+
+[source,java]
+---------------------------------------------------------------------------------------------------------------------------
+from("slack://general?token=RAW(<YOUR_TOKEN>)&maxResults=1")
+    .to("mock:result");
+---------------------------------------------------------------------------------------------------------------------------
+
+In this way you'll get the last message from general channel. The consumer will take track of the timestamp of the last message consumed and in the next poll it will check from that timestamp.
+
 ### See Also
 
 * Configuring Camel
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
new file mode 100644
index 0000000..7528c8a
--- /dev/null
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackConsumer.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.slack;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SlackConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SlackConsumer.class);
+
+    private SlackEndpoint slackEndpoint;
+    private String timestamp;
+
+    public SlackConsumer(SlackEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.slackEndpoint = endpoint;
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        Queue<Exchange> exchanges;
+
+        String channelId = getChannelId(slackEndpoint.getChannel());
+        HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
+        HttpPost httpPost = new HttpPost("https://slack.com/api/channels.history");
+        List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
+        params.add(new BasicNameValuePair("channel", channelId));
+        if (ObjectHelper.isNotEmpty(timestamp)) {
+            params.add(new BasicNameValuePair("oldest", timestamp));
+        }
+        params.add(new BasicNameValuePair("count", slackEndpoint.getMaxResults()));
+        params.add(new BasicNameValuePair("token", slackEndpoint.getToken()));
+        httpPost.setEntity(new UrlEncodedFormEntity(params));
+
+        HttpResponse response = client.execute(httpPost);
+
+        String jsonString = readResponse(response);
+        JSONParser parser = new JSONParser();
+
+        JSONObject c = (JSONObject)parser.parse(jsonString);
+        List list = (List)c.get("messages");
+        exchanges = createExchanges(list);
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    private Queue<Exchange> createExchanges(List list) {
+        Queue<Exchange> answer = new LinkedList<>();
+        Iterator it = list.iterator();
+        int i = 0;
+        while (it.hasNext()) {
+            Object object = (Object)it.next();
+            JSONObject singleMess = (JSONObject)object;
+            if (i == 0) {
+                timestamp = (String)singleMess.get("ts");
+            }
+            i++;
+            Exchange exchange = slackEndpoint.createExchange(singleMess);
+            answer.add(exchange);
+        }
+        return answer;
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) throws Exception {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            getAsyncProcessor().process(exchange, new AsyncCallback() {
+                @Override
+                public void done(boolean doneSync) {
+                    LOG.trace("Processing exchange done");
+                }
+            });
+        }
+
+        return total;
+    }
+
+    private String getChannelId(String channel) throws IOException, ParseException {
+        HttpClient client = HttpClientBuilder.create().useSystemProperties().build();
+        HttpPost httpPost = new HttpPost("https://slack.com/api/channels.list");
+
+        List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
+        params.add(new BasicNameValuePair("token", slackEndpoint.getToken()));
+        httpPost.setEntity(new UrlEncodedFormEntity(params));
+
+        HttpResponse response = client.execute(httpPost);
+
+        String jsonString = readResponse(response);
+        JSONParser parser = new JSONParser();
+
+        JSONObject c = (JSONObject)parser.parse(jsonString);
+        List list = (List)c.get("channels");
+        Iterator it = list.iterator();
+        int i = 0;
+        while (it.hasNext()) {
+            Object object = (Object)it.next();
+            JSONObject singleChannel = (JSONObject)object;
+            if (singleChannel.get("name") != null) {
+                if (singleChannel.get("name").equals(channel)) {
+                    if (singleChannel.get("id") != null) {
+                        return (String)singleChannel.get("id");
+                    }
+                }
+            }
+
+        }
+        return jsonString;
+    }
+
+    private String readResponse(HttpResponse response) throws IOException {
+        InputStream s = response.getEntity().getContent();
+        ByteArrayOutputStream result = new ByteArrayOutputStream();
+        byte[] buffer = new byte[1024];
+        int length;
+        while ((length = s.read(buffer)) != -1) {
+            result.write(buffer, 0, length);
+        }
+        String jsonString = result.toString(StandardCharsets.UTF_8.name());
+        return jsonString;
+    }
+
+}
diff --git a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
index 46d91a6..2b63f13 100644
--- a/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
+++ b/components/camel-slack/src/main/java/org/apache/camel/component/slack/SlackEndpoint.java
@@ -17,30 +17,42 @@
 package org.apache.camel.component.slack;
 
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.component.slack.helper.SlackMessage;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.ObjectHelper;
+import org.json.simple.JSONObject;
 
 /**
  * The slack component allows you to send messages to Slack.
  */
-@UriEndpoint(firstVersion = "2.16.0", scheme = "slack", title = "Slack", syntax = "slack:channel", producerOnly = true, label = "social")
+@UriEndpoint(firstVersion = "2.16.0", scheme = "slack", title = "Slack", syntax = "slack:channel", label = "social")
 public class SlackEndpoint extends DefaultEndpoint {
 
-    @UriPath @Metadata(required = "true")
+    @UriPath
+    @Metadata(required = "true")
     private String channel;
-    @UriParam
+    @UriParam(label = "producer")
     private String webhookUrl;
-    @UriParam(secret = true)
+    @UriParam(label = "producer", secret = true)
     private String username;
-    @UriParam
+    @UriParam(label = "producer")
     private String iconUrl;
-    @UriParam
+    @UriParam(label = "producer")
     private String iconEmoji;
+    @UriParam(label = "consumer", secret = true)
+    private String token;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private String maxResults = "10";
 
     /**
      * Constructor for SlackEndpoint
@@ -63,7 +75,11 @@ public class SlackEndpoint extends DefaultEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        throw new UnsupportedOperationException("You cannot consume slack messages from this endpoint: " + getEndpointUri());
+        if (ObjectHelper.isEmpty(token)) {
+            throw new RuntimeCamelException("Missing required endpoint configuration: token must be defined for Slack consumer");
+        }
+        SlackConsumer consumer = new SlackConsumer(this, processor);
+        return consumer;
     }
 
     @Override
@@ -87,7 +103,8 @@ public class SlackEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * The channel name (syntax #name) or slackuser (syntax @userName) to send a message directly to an user.
+     * The channel name (syntax #name) or slackuser (syntax @userName) to send a
+     * message directly to an user.
      */
     public void setChannel(String channel) {
         this.channel = channel;
@@ -98,7 +115,8 @@ public class SlackEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * This is the username that the bot will have when sending messages to a channel or user.
+     * This is the username that the bot will have when sending messages to a
+     * channel or user.
      */
     public void setUsername(String username) {
         this.username = username;
@@ -109,7 +127,8 @@ public class SlackEndpoint extends DefaultEndpoint {
     }
 
     /**
-     * The avatar that the component will use when sending message to a channel or user.
+     * The avatar that the component will use when sending message to a channel
+     * or user.
      */
     public void setIconUrl(String iconUrl) {
         this.iconUrl = iconUrl;
@@ -125,5 +144,48 @@ public class SlackEndpoint extends DefaultEndpoint {
     public void setIconEmoji(String iconEmoji) {
         this.iconEmoji = iconEmoji;
     }
-}
 
+    public String getToken() {
+        return token;
+    }
+
+    /**
+     * The token to use
+     */
+    public void setToken(String token) {
+        this.token = token;
+    }
+
+    public String getMaxResults() {
+        return maxResults;
+    }
+
+    /**
+     * The Max Result for the poll
+     */
+    public void setMaxResults(String maxResult) {
+        this.maxResults = maxResult;
+    }
+
+    public Exchange createExchange(JSONObject object) {
+        return createExchange(getExchangePattern(), object);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern, JSONObject object) {
+        Exchange exchange = super.createExchange(pattern);
+        SlackMessage slackMessage = new SlackMessage();
+        String text = (String)object.get("text");
+        String username = (String)object.get("username");
+        slackMessage.setText(text);
+        slackMessage.setUsername(username);
+        if (ObjectHelper.isNotEmpty((JSONObject)object.get("icons"))) {
+            JSONObject icons = (JSONObject)object.get("icons");
+            if (ObjectHelper.isNotEmpty((String)icons.get("emoji"))) {
+                slackMessage.setIconEmoji((String)icons.get("emoji"));
+            }
+        }
+        Message message = exchange.getIn();
+        message.setBody(slackMessage);
+        return exchange;
+    }
+}
diff --git a/components/camel-slack/src/test/java/org/apache/camel/component/slack/SlackConsumerTest.java b/components/camel-slack/src/test/java/org/apache/camel/component/slack/SlackConsumerTest.java
new file mode 100644
index 0000000..2a439cd
--- /dev/null
+++ b/components/camel-slack/src/test/java/org/apache/camel/component/slack/SlackConsumerTest.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.slack;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class SlackConsumerTest extends CamelTestSupport {
+
+    @Test
+    public void testConsumePrefixedMessages() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(9);
+        
+        assertMockEndpointsSatisfied();
+    }
+    
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("slack://general?token=RAW(token)&maxResults=1")
+                    .to("mock:result");
+            }
+        };
+    }
+}