You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/05/21 08:41:07 UTC

[4/5] camel git commit: CAMEL-9969: Fixed some issues after review

CAMEL-9969: Fixed some issues after review


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/93f04b94
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/93f04b94
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/93f04b94

Branch: refs/heads/master
Commit: 93f04b94d17805fd8bcdc23ed6c5274987373023
Parents: 8dc984e
Author: Nicola Ferraro <ni...@gmail.com>
Authored: Fri May 20 16:21:57 2016 +0200
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Sat May 21 10:36:05 2016 +0200

----------------------------------------------------------------------
 .../camel-telegram/src/main/docs/telegram.adoc  | 20 +++++++++++++--
 .../telegram/TelegramConfiguration.java         | 21 ++++++----------
 .../component/telegram/TelegramConsumer.java    | 18 ++++----------
 .../component/telegram/TelegramEndpoint.java    | 26 ++++++++++++++++----
 .../component/telegram/TelegramProducer.java    |  9 +++----
 .../telegram/TelegramConfigurationTest.java     |  4 +--
 6 files changed, 57 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/main/docs/telegram.adoc
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/main/docs/telegram.adoc b/components/camel-telegram/src/main/docs/telegram.adoc
index 5b33652..e58c9ef 100644
--- a/components/camel-telegram/src/main/docs/telegram.adoc
+++ b/components/camel-telegram/src/main/docs/telegram.adoc
@@ -53,8 +53,9 @@ The Telegram component has no options.
 // component options: END
 
 
+
 // endpoint options: START
-The Telegram component supports 10 endpoint options which are listed below:
+The Telegram component supports 24 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -63,19 +64,34 @@ The Telegram component supports 10 endpoint options which are listed below:
 | type | common |  | String | *Required* The endpoint type. Currently only the 'bots' type is supported.
 | authorizationToken | common |  | String | *Required* The authorization token for using the bot (ask the BotFather) eg. 654321531:HGF_dTra456323dHuOedsE343211fqr3t-H.
 | bridgeErrorHandler | consumer | false | boolean | Allows for bridging the consumer to the Camel routing Error Handler which mean any exceptions occurred while the consumer is trying to pickup incoming messages or the likes will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions that will be logged at WARN/ERROR level and ignored.
-| delay | consumer | 1000 | Long | Delay in milliseconds between two consecutive polls to the 'getUpdates' service.
 | limit | consumer | 100 | Integer | Limit on the number of updates that can be received in a single polling request.
+| sendEmptyMessageWhenIdle | consumer | false | boolean | If the polling consumer did not poll any files you can enable this option to send an empty message (no body) instead.
 | timeout | consumer | 30 | Integer | Timeout in seconds for long polling. Put 0 for short polling or a bigger number for long polling. Long polling produces shorter response time.
 | exceptionHandler | consumer (advanced) |  | ExceptionHandler | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this options is not in use. By default the consumer will deal with exceptions that will be logged at WARN/ERROR level and ignored.
+| pollStrategy | consumer (advanced) |  | PollingConsumerPollStrategy | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel.
 | chatId | producer |  | String | The identifier of the chat that will receive the produced messages. Chat ids can be first obtained from incoming messages (eg. when a telegram user starts a conversation with a bot its client sends automatically a '/start' message containing the chat id). It is an optional parameter as the chat id can be set dynamically for each outgoing message (using body or headers).
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
+| backoffErrorThreshold | scheduler |  | int | The number of subsequent error polls (failed due some error) that should happen before the backoffMultipler should kick-in.
+| backoffIdleThreshold | scheduler |  | int | The number of subsequent idle polls that should happen before the backoffMultipler should kick-in.
+| backoffMultiplier | scheduler |  | int | To let the scheduled polling consumer backoff if there has been a number of subsequent idles/errors in a row. The multiplier is then the number of polls that will be skipped before the next actual attempt is happening again. When this option is in use then backoffIdleThreshold and/or backoffErrorThreshold must also be configured.
+| delay | scheduler | 500 | long | Milliseconds before the next poll. You can also specify time values using units such as 60s (60 seconds) 5m30s (5 minutes and 30 seconds) and 1h (1 hour).
+| greedy | scheduler | false | boolean | If greedy is enabled then the ScheduledPollConsumer will run immediately again if the previous run polled 1 or more messages.
+| initialDelay | scheduler | 1000 | long | Milliseconds before the first poll starts. You can also specify time values using units such as 60s (60 seconds) 5m30s (5 minutes and 30 seconds) and 1h (1 hour).
+| runLoggingLevel | scheduler | TRACE | LoggingLevel | The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that.
+| scheduledExecutorService | scheduler |  | ScheduledExecutorService | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool.
+| scheduler | scheduler | none | ScheduledPollConsumerScheduler | To use a cron scheduler from either camel-spring or camel-quartz2 component
+| schedulerProperties | scheduler |  | Map | To configure additional properties when using a custom scheduler or any of the Quartz2 Spring based scheduler.
+| startScheduler | scheduler | true | boolean | Whether the scheduler should be auto started.
+| timeUnit | scheduler | MILLISECONDS | TimeUnit | Time unit for initialDelay and delay options.
+| useFixedDelay | scheduler | true | boolean | Controls if fixed delay or fixed rate is used. See ScheduledExecutorService in JDK for details.
 |=======================================================================
 {% endraw %}
 // endpoint options: END
 
 
 
+
 [[Telegram-MessageHeaders]]
 Message Headers
 ^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConfiguration.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConfiguration.java
index 61e81c9..cd5f14c 100644
--- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConfiguration.java
+++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConfiguration.java
@@ -42,9 +42,6 @@ public class TelegramConfiguration {
             + "It is an optional parameter, as the chat id can be set dynamically for each outgoing message (using body or headers).", label = "producer")
     private String chatId;
 
-    @UriParam(description = "Delay in milliseconds between two consecutive polls to the 'getUpdates' service.", optionalPrefix = "consumer.", defaultValue = "1000", label = "consumer")
-    private Long delay = 1000L;
-
     @UriParam(description = "Timeout in seconds for long polling. Put 0 for short polling or a bigger number for long polling. Long polling produces shorter response time.", optionalPrefix =
             "consumer.", defaultValue = "30", label = "consumer")
     private Integer timeout = 30;
@@ -104,14 +101,6 @@ public class TelegramConfiguration {
         this.chatId = chatId;
     }
 
-    public Long getDelay() {
-        return delay;
-    }
-
-    public void setDelay(Long delay) {
-        this.delay = delay;
-    }
-
     public Integer getTimeout() {
         return timeout;
     }
@@ -130,7 +119,13 @@ public class TelegramConfiguration {
 
     @Override
     public String toString() {
-        return "TelegramConfiguration{" + "type='" + type + '\'' + ", authorizationToken='" + authorizationToken + '\'' + ", chatId='" + chatId + '\'' + ", delay=" + delay
-                + ", timeout=" + timeout + ", limit=" + limit + '}';
+        final StringBuilder sb = new StringBuilder("TelegramConfiguration{");
+        sb.append("type='").append(type).append('\'');
+        sb.append(", authorizationToken='").append(authorizationToken).append('\'');
+        sb.append(", chatId='").append(chatId).append('\'');
+        sb.append(", timeout=").append(timeout);
+        sb.append(", limit=").append(limit);
+        sb.append('}');
+        return sb.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConsumer.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConsumer.java
index 5935d9f..1799439 100644
--- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConsumer.java
+++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramConsumer.java
@@ -40,7 +40,6 @@ public class TelegramConsumer extends ScheduledPollConsumer {
 
     public TelegramConsumer(TelegramEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
-        setDelay(endpoint.getConfiguration().getDelay());
         this.endpoint = endpoint;
     }
 
@@ -68,7 +67,7 @@ public class TelegramConsumer extends ScheduledPollConsumer {
         List<Update> updates = updateResult.getUpdates();
 
         if (updates.size() > 0) {
-            log.info("Received " + updates.size() + " updates from Telegram service");
+            log.debug("Received {} updates from Telegram service", updates.size());
         } else {
             log.debug("No updates received from Telegram service");
         }
@@ -84,26 +83,19 @@ public class TelegramConsumer extends ScheduledPollConsumer {
     private void processUpdates(List<Update> updates) throws Exception {
         for (Update update : updates) {
 
-            log.debug("Received update from Telegram service: " + update);
+            log.debug("Received update from Telegram service: {}", update);
 
-            Exchange exchange = endpoint.createExchange();
-
-            if (update.getMessage() != null) {
-                exchange.getIn().setBody(update.getMessage());
-
-                if (update.getMessage().getChat() != null) {
-                    exchange.getIn().setHeader(TelegramConstants.TELEGRAM_CHAT_ID, update.getMessage().getChat().getId());
-                }
-            }
+            Exchange exchange = endpoint.createExchange(update);
             getProcessor().process(exchange);
         }
     }
 
+
     private void updateOffset(List<Update> updates) {
         OptionalLong ol = updates.stream().mapToLong(Update::getUpdateId).max();
         if (ol.isPresent()) {
             this.offset = ol.getAsLong() + 1;
-            log.debug("Next Telegram offset will be " + this.offset);
+            log.debug("Next Telegram offset will be {}", this.offset);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java
index 8c06d18..e0fa237 100644
--- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java
+++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramEndpoint.java
@@ -18,19 +18,19 @@ package org.apache.camel.component.telegram;
 
 import org.apache.camel.Component;
 import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
-import org.apache.camel.spi.Metadata;
+import org.apache.camel.component.telegram.model.Update;
+import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
-import org.apache.camel.spi.UriPath;
 
 /**
  * The Camel endpoint for a telegram bot.
  */
 @UriEndpoint(scheme = "telegram", title = "Telegram", syntax = "telegram:type/authorizationToken", consumerClass = TelegramConsumer.class, label = "chat")
-public class TelegramEndpoint extends DefaultEndpoint {
+public class TelegramEndpoint extends ScheduledPollEndpoint {
 
     @UriParam
     private TelegramConfiguration configuration;
@@ -47,7 +47,23 @@ public class TelegramEndpoint extends DefaultEndpoint {
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new TelegramConsumer(this, processor);
+        TelegramConsumer consumer = new TelegramConsumer(this, processor);
+        configureConsumer(consumer);
+        return consumer;
+    }
+
+    public Exchange createExchange(Update update) {
+        Exchange exchange = super.createExchange();
+
+        if (update.getMessage() != null) {
+            exchange.getIn().setBody(update.getMessage());
+
+            if (update.getMessage().getChat() != null) {
+                exchange.getIn().setHeader(TelegramConstants.TELEGRAM_CHAT_ID, update.getMessage().getChat().getId());
+            }
+        }
+
+        return exchange;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramProducer.java b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramProducer.java
index 4f63445..c5df22a 100644
--- a/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramProducer.java
+++ b/components/camel-telegram/src/main/java/org/apache/camel/component/telegram/TelegramProducer.java
@@ -37,7 +37,7 @@ public class TelegramProducer extends DefaultProducer {
 
         if (exchange.getIn().getBody() == null) {
             // fail fast
-            log.info("Received exchange with empty body, skipping");
+            log.debug("Received exchange with empty body, skipping");
             return;
         }
 
@@ -50,15 +50,14 @@ public class TelegramProducer extends DefaultProducer {
         if (message.getChatId() == null) {
             log.debug("Chat id is null on outgoing message, trying resolution");
             String chatId = resolveChatId(config, message, exchange);
-            log.debug("Resolved chat id is " + chatId);
+            log.debug("Resolved chat id is {}", chatId);
             message.setChatId(chatId);
         }
 
         TelegramService service = TelegramServiceProvider.get().getService();
 
-        log.info("Sending text message to Telegram service");
-        log.debug("Message being sent is: " + message);
-        log.debug("Headers of message being sent are: " + exchange.getIn().getHeaders());
+        log.debug("Message being sent is: {}", message);
+        log.debug("Headers of message being sent are: {}", exchange.getIn().getHeaders());
 
         service.sendMessage(config.getAuthorizationToken(), message);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/93f04b94/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConfigurationTest.java b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConfigurationTest.java
index b2ddf5f..d918661 100644
--- a/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConfigurationTest.java
+++ b/components/camel-telegram/src/test/java/org/apache/camel/component/telegram/TelegramConfigurationTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 public class TelegramConfigurationTest extends TelegramTestSupport {
 
 
-
     @Test
     public void testChatBotResult() throws Exception {
         TelegramEndpoint endpoint = (TelegramEndpoint) context().getEndpoints().stream().filter(e -> e instanceof TelegramEndpoint).findAny().get();
@@ -36,13 +35,12 @@ public class TelegramConfigurationTest extends TelegramTestSupport {
         assertEquals("bots", config.getType());
         assertEquals("mock-token", config.getAuthorizationToken());
         assertEquals("12345", config.getChatId());
-        assertEquals(Long.valueOf(2000L), config.getDelay());
+        assertEquals(2000L, endpoint.getDelay());
         assertEquals(Integer.valueOf(10), config.getTimeout());
         assertEquals(Integer.valueOf(60), config.getLimit());
     }
 
 
-
     @Override
     protected RoutesBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {