You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/02/29 17:38:53 UTC
[2/3] camel git commit: CAMEL-8258 - Support Streaming from User
Endpoint including Direct Messages
CAMEL-8258 - Support Streaming from User Endpoint including Direct Messages
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33b3bd5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33b3bd5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33b3bd5c
Branch: refs/heads/master
Commit: 33b3bd5c9a011788787c094e83848ca666a52724
Parents: b09eca2
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Feb 26 12:36:21 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Feb 29 17:35:39 2016 +0100
----------------------------------------------------------------------
.../component/twitter/Twitter4JFactory.java | 173 -------------------
.../component/twitter/TwitterConstants.java | 25 ++-
.../twitter/TwitterEndpointDirect.java | 6 +-
.../component/twitter/TwitterEndpointEvent.java | 4 +-
.../twitter/TwitterEndpointPolling.java | 8 +-
.../camel/component/twitter/TwitterHelper.java | 172 ++++++++++++++++++
.../twitter/consumer/TweeterStatusListener.java | 24 ---
.../twitter/consumer/Twitter4JConsumer.java | 72 --------
.../twitter/consumer/TwitterConsumer.java | 86 +++++++++
.../twitter/consumer/TwitterConsumerDirect.java | 16 +-
.../twitter/consumer/TwitterConsumerEvent.java | 33 ++--
.../consumer/TwitterConsumerPolling.java | 38 ++--
.../twitter/consumer/TwitterEventListener.java | 26 +++
.../twitter/consumer/TwitterEventType.java | 77 +++++++++
.../directmessage/DirectMessageConsumer.java | 28 +--
.../twitter/consumer/search/SearchConsumer.java | 52 +++---
.../streaming/AbstractStreamingConsumer.java | 128 ++++++++++++++
.../consumer/streaming/FilterConsumer.java | 77 ---------
.../streaming/FilterStreamingConsumer.java | 77 +++++++++
.../consumer/streaming/SampleConsumer.java | 40 -----
.../streaming/SampleStreamingConsumer.java | 40 +++++
.../consumer/streaming/StreamingConsumer.java | 108 ------------
.../streaming/UserStreamingConsumer.java | 156 ++++++++++++-----
.../timeline/AbstractStatusConsumer.java | 55 ++++++
.../twitter/consumer/timeline/HomeConsumer.java | 23 +--
.../consumer/timeline/MentionsConsumer.java | 23 +--
.../consumer/timeline/RetweetsConsumer.java | 23 +--
.../twitter/consumer/timeline/UserConsumer.java | 23 +--
.../component/twitter/data/ConsumerType.java | 12 +-
.../component/twitter/data/EndpointType.java | 12 +-
.../component/twitter/data/StreamingType.java | 11 +-
.../component/twitter/data/TimelineType.java | 11 +-
.../component/twitter/data/TrendsType.java | 11 +-
.../twitter/producer/DirectMessageProducer.java | 10 +-
.../twitter/producer/SearchProducer.java | 24 +--
.../twitter/producer/Twitter4JProducer.java | 36 ----
.../twitter/producer/TwitterProducer.java | 37 ++++
.../twitter/producer/UserProducer.java | 14 +-
.../twitter/util/TwitterConverter.java | 28 ++-
.../twitter/CamelTwitterTestSupport.java | 65 ++++---
.../component/twitter/UserStreamingTest.java | 49 ++++++
.../src/test/resources/log4j.properties | 1 +
42 files changed, 1112 insertions(+), 822 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
deleted file mode 100644
index 9025164..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter;
-
-import java.util.regex.Pattern;
-
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer;
-import org.apache.camel.component.twitter.consumer.search.SearchConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.FilterConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.SampleConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.UserConsumer;
-import org.apache.camel.component.twitter.data.ConsumerType;
-import org.apache.camel.component.twitter.data.StreamingType;
-import org.apache.camel.component.twitter.data.TimelineType;
-import org.apache.camel.component.twitter.producer.DirectMessageProducer;
-import org.apache.camel.component.twitter.producer.SearchProducer;
-import org.apache.camel.component.twitter.producer.UserProducer;
-import org.apache.camel.impl.DefaultProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Maps the endpoint URI to the respective Twitter4J consumer or producer.
- * <p/>
- * URI STRUCTURE:
- * <p/>
- * timeline/
- * public
- * home
- * friends
- * user (ALSO A PRODUCER)
- * mentions
- * retweetsofme
- * user/
- * search users (DIRECT ONLY)
- * user suggestions (DIRECT ONLY)
- * trends/
- * daily
- * weekly
- * userlist
- * directmessage (ALSO A PRODUCER)
- * streaming/
- * filter (POLLING ONLY)
- * sample (POLLING ONLY)
- * user (POLLING ONLY)
- */
-public final class Twitter4JFactory {
-
- private static final Logger LOG = LoggerFactory.getLogger(Twitter4JFactory.class);
-
- private Twitter4JFactory() {
- // helper class
- }
-
- public static Twitter4JConsumer getConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
- String[] uriSplit = splitUri(uri);
-
- if (uriSplit.length > 0) {
- switch (ConsumerType.fromUri(uriSplit[0])) {
- case DIRECTMESSAGE:
- return new DirectMessageConsumer(te);
- case SEARCH:
- boolean hasNoKeywords = te.getProperties().getKeywords() == null
- || te.getProperties().getKeywords().trim().isEmpty();
- if (hasNoKeywords) {
- throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided.");
- } else {
- return new SearchConsumer(te);
- }
- case STREAMING:
- switch (StreamingType.fromUri(uriSplit[1])) {
- case SAMPLE:
- return new SampleConsumer(te);
- case FILTER:
- return new FilterConsumer(te);
- case USER:
- return new UserStreamingConsumer(te);
- default:
- break;
- }
- break;
- case TIMELINE:
- if (uriSplit.length > 1) {
- switch (TimelineType.fromUri(uriSplit[1])) {
- case HOME:
- return new HomeConsumer(te);
- case MENTIONS:
- return new MentionsConsumer(te);
- case RETWEETSOFME:
- return new RetweetsConsumer(te);
- case USER:
- if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
- throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set.");
- } else {
- return new UserConsumer(te);
- }
- default:
- break;
- }
- }
- break;
- default:
- break;
- }
- }
-
- throw new IllegalArgumentException("Cannot create any consumer with uri " + uri
- + ". A consumer type was not provided (or an incorrect pairing was used).");
- }
-
- public static DefaultProducer getProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
- String[] uriSplit = splitUri(uri);
-
- if (uriSplit.length > 0) {
- switch (ConsumerType.fromUri(uriSplit[0])) {
- case DIRECTMESSAGE:
- if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
- throw new IllegalArgumentException(
- "Producer type set to DIRECT MESSAGE but no recipient user was set.");
- } else {
- return new DirectMessageProducer(te);
- }
- case TIMELINE:
- if (uriSplit.length > 1) {
- switch (TimelineType.fromUri(uriSplit[1])) {
- case USER:
- return new UserProducer(te);
- default:
- break;
- }
- }
- break;
- case SEARCH:
- return new SearchProducer(te);
- default:
- break;
- }
-
- }
-
- throw new IllegalArgumentException("Cannot create any producer with uri " + uri
- + ". A producer type was not provided (or an incorrect pairing was used).");
- }
-
- private static String[] splitUri(String uri) {
- Pattern p1 = Pattern.compile("twitter:(//)*");
- Pattern p2 = Pattern.compile("\\?.*");
-
- uri = p1.matcher(uri).replaceAll("");
- uri = p2.matcher(uri).replaceAll("");
-
- return uri.split("/");
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
index a6e845b..fd9bf27 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
@@ -19,19 +19,14 @@ package org.apache.camel.component.twitter;
/**
* Defines common constants
*/
-public final class TwitterConstants {
-
- public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords";
-
- public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage";
- public static final String TWITTER_COUNT = "CamelTwitterCount";
- public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages";
- public static final String TWITTER_SINCEID = "CamelTwitterSinceId";
- public static final String TWITTER_MAXID = "CamelTwitterMaxId";
- public static final String TWITTER_USER = "CamelTwitterUser";
-
- private TwitterConstants() {
- // utility
- }
-
+public interface TwitterConstants {
+ String TWITTER_KEYWORDS = "CamelTwitterKeywords";
+ String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage";
+ String TWITTER_COUNT = "CamelTwitterCount";
+ String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages";
+ String TWITTER_SINCEID = "CamelTwitterSinceId";
+ String TWITTER_MAXID = "CamelTwitterMaxId";
+ String TWITTER_USER = "CamelTwitterUser";
+ String TWITTER_USER_ROLE = "CamelTwitterUserRole";
+ String TWITTER_EVENT_TYPE = "CamelTwitterEventType";
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
index bc5b350..a3f2254 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
@@ -23,7 +23,7 @@ import org.apache.camel.Producer;
import org.apache.camel.ServiceStatus;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.component.direct.DirectEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
import org.apache.camel.component.twitter.consumer.TwitterConsumerDirect;
import org.apache.camel.component.twitter.data.EndpointType;
@@ -43,7 +43,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+ TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
TwitterConsumerDirect answer = new TwitterConsumerDirect(this, processor, twitter4jConsumer);
configureConsumer(answer);
return answer;
@@ -51,7 +51,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp
@Override
public Producer createProducer() throws Exception {
- return Twitter4JFactory.getProducer(this, getEndpointUri());
+ return TwitterHelper.createProducer(this, getEndpointUri());
}
@ManagedAttribute
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
index 0690664..0c49e17 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.twitter;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent;
import org.apache.camel.component.twitter.data.EndpointType;
import org.apache.camel.impl.DefaultEndpoint;
@@ -36,7 +36,7 @@ public class TwitterEndpointEvent extends DefaultEndpoint implements TwitterEndp
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+ TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
return new TwitterConsumerEvent(this, processor, twitter4jConsumer);
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
index 250c33a..8c17d1e 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
@@ -21,7 +21,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.api.management.ManagedAttribute;
import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling;
import org.apache.camel.component.twitter.data.EndpointType;
import org.apache.camel.impl.DefaultPollingEndpoint;
@@ -32,7 +32,7 @@ import org.apache.camel.spi.UriParam;
* This component integrates with Twitter to send tweets or search for tweets and more.
*/
@ManagedResource(description = "Managed Twitter Endpoint")
-@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = Twitter4JConsumer.class, label = "api,social")
+@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = TwitterConsumer.class, label = "api,social")
public class TwitterEndpointPolling extends DefaultPollingEndpoint implements TwitterEndpoint {
@UriParam(optionalPrefix = "consumer.", defaultValue = "" + TwitterConsumerPolling.DEFAULT_CONSUMER_DELAY, label = "consumer,scheduler",
@@ -49,7 +49,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+ TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
// update the pulling lastID with sinceId
twitter4jConsumer.setLastId(properties.getSinceId());
TwitterConsumerPolling tc = new TwitterConsumerPolling(this, processor, twitter4jConsumer);
@@ -59,7 +59,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw
@Override
public Producer createProducer() throws Exception {
- return Twitter4JFactory.getProducer(this, getEndpointUri());
+ return TwitterHelper.createProducer(this, getEndpointUri());
}
@Override
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
new file mode 100644
index 0000000..de503a5
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter;
+
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer;
+import org.apache.camel.component.twitter.consumer.search.SearchConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.FilterStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.SampleStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.UserConsumer;
+import org.apache.camel.component.twitter.data.ConsumerType;
+import org.apache.camel.component.twitter.data.StreamingType;
+import org.apache.camel.component.twitter.data.TimelineType;
+import org.apache.camel.component.twitter.producer.DirectMessageProducer;
+import org.apache.camel.component.twitter.producer.SearchProducer;
+import org.apache.camel.component.twitter.producer.TwitterProducer;
+import org.apache.camel.component.twitter.producer.UserProducer;
+import twitter4j.User;
+
+public final class TwitterHelper {
+ private TwitterHelper() {
+ }
+
+ public static void setUserHeader(Exchange exchange, User user) {
+ setUserHeader(exchange.getIn(), user);
+ }
+
+ public static void setUserHeader(Message message, User user) {
+ message.setHeader(TwitterConstants.TWITTER_USER, user);
+ }
+
+ public static void setUserHeader(Exchange exchange, int index, User user, String role) {
+ setUserHeader(exchange.getIn(), index, user, role);
+ }
+
+ public static void setUserHeader(Message message, int index, User user, String role) {
+ message.setHeader(TwitterConstants.TWITTER_USER + index, user);
+ message.setHeader(TwitterConstants.TWITTER_USER_ROLE + index, role);
+ }
+
+ public static TwitterConsumer createConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
+ String[] uriSplit = splitUri(uri);
+
+ if (uriSplit.length > 0) {
+ switch (ConsumerType.fromUri(uriSplit[0])) {
+ case DIRECTMESSAGE:
+ return new DirectMessageConsumer(te);
+ case SEARCH:
+ boolean hasNoKeywords = te.getProperties().getKeywords() == null
+ || te.getProperties().getKeywords().trim().isEmpty();
+ if (hasNoKeywords) {
+ throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided.");
+ } else {
+ return new SearchConsumer(te);
+ }
+ case STREAMING:
+ switch (StreamingType.fromUri(uriSplit[1])) {
+ case SAMPLE:
+ return new SampleStreamingConsumer(te);
+ case FILTER:
+ return new FilterStreamingConsumer(te);
+ case USER:
+ return new UserStreamingConsumer(te);
+ default:
+ break;
+ }
+ break;
+ case TIMELINE:
+ if (uriSplit.length > 1) {
+ switch (TimelineType.fromUri(uriSplit[1])) {
+ case HOME:
+ return new HomeConsumer(te);
+ case MENTIONS:
+ return new MentionsConsumer(te);
+ case RETWEETSOFME:
+ return new RetweetsConsumer(te);
+ case USER:
+ if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
+ throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set.");
+ } else {
+ return new UserConsumer(te);
+ }
+ default:
+ break;
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ }
+
+ throw new IllegalArgumentException("Cannot create any consumer with uri " + uri
+ + ". A consumer type was not provided (or an incorrect pairing was used).");
+ }
+
+ public static TwitterProducer createProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
+ String[] uriSplit = splitUri(uri);
+
+ if (uriSplit.length > 0) {
+ switch (ConsumerType.fromUri(uriSplit[0])) {
+ case DIRECTMESSAGE:
+ if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "Producer type set to DIRECT MESSAGE but no recipient user was set.");
+ } else {
+ return new DirectMessageProducer(te);
+ }
+ case TIMELINE:
+ if (uriSplit.length > 1) {
+ switch (TimelineType.fromUri(uriSplit[1])) {
+ case USER:
+ return new UserProducer(te);
+ default:
+ break;
+ }
+ }
+ break;
+ case SEARCH:
+ return new SearchProducer(te);
+ default:
+ break;
+ }
+
+ }
+
+ throw new IllegalArgumentException("Cannot create any producer with uri " + uri
+ + ". A producer type was not provided (or an incorrect pairing was used).");
+ }
+
+ private static String[] splitUri(String uri) {
+ Pattern p1 = Pattern.compile("twitter:(//)*");
+ Pattern p2 = Pattern.compile("\\?.*");
+
+ uri = p1.matcher(uri).replaceAll("");
+ uri = p2.matcher(uri).replaceAll("");
+
+ return uri.split("/");
+ }
+
+ public static <T extends Enum<T>> T enumFromString(T[] values, String uri, T defaultValue) {
+ for (int i = values.length - 1; i >= 0; i--) {
+ if (values[i].name().equalsIgnoreCase(uri)) {
+ return values[i];
+ }
+ }
+
+ return defaultValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
deleted file mode 100644
index 98830f8..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer;
-
-import twitter4j.Status;
-
-public interface TweeterStatusListener {
-
- void onStatus(Status status);
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
deleted file mode 100644
index 14ecae3..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-
-import twitter4j.TwitterException;
-
-
-public abstract class Twitter4JConsumer {
-
- /**
- * Instance of TwitterEndpoint.
- */
- protected TwitterEndpoint te;
-
- /**
- * The last tweet ID received.
- */
- protected long lastId = 1;
-
- protected Twitter4JConsumer(TwitterEndpoint te) {
- this.te = te;
- }
-
- /**
- * Can't assume that the end of the list will be the most recent ID.
- * The Twitter API sometimes returns them slightly out of order.
- */
- protected void checkLastId(long newId) {
- if (newId > lastId) {
- lastId = newId;
- }
- }
-
- /**
- * Called by polling consumers during each poll. It needs to be separate
- * from directConsume() since, as an example, streaming API polling allows
- * tweets to build up between polls.
- */
- public abstract List<? extends Serializable> pollConsume() throws TwitterException;
-
- /**
- * Called by direct consumers.
- */
- public abstract List<? extends Serializable> directConsume() throws TwitterException;
-
- /**
- * Support to update the Consumer's lastId when starting the consumer
- * @param sinceId
- */
- public void setLastId(long sinceId) {
- lastId = sinceId;
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
new file mode 100644
index 0000000..b1d9b37
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.Paging;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+
+public abstract class TwitterConsumer {
+
+ /**
+ * Instance of TwitterEndpoint.
+ */
+ protected final TwitterEndpoint endpoint;
+
+ /**
+ * The last tweet ID received.
+ */
+ private long lastId;
+
+ protected TwitterConsumer(TwitterEndpoint endpoint) {
+ this.endpoint = endpoint;
+ this.lastId = -1;
+ }
+
+ /**
+ * Called by polling consumers during each poll. It needs to be separate
+ * from directConsume() since, as an example, streaming API polling allows
+ * tweets to build up between polls.
+ */
+ public abstract List<Exchange> pollConsume() throws TwitterException;
+
+ /**
+ * Called by direct consumers.
+ */
+ public abstract List<Exchange> directConsume() throws TwitterException;
+
+ /**
+ * Can't assume that the end of the list will be the most recent ID.
+ * The Twitter API sometimes returns them slightly out of order.
+ */
+ protected void setLastIdIfGreater(long newId) {
+ if (newId > lastId) {
+ lastId = newId;
+ }
+ }
+
+ /**
+ * Support to update the Consumer's lastId when starting the consumer
+ * @param sinceId
+ */
+ public void setLastId(long sinceId) {
+ lastId = sinceId;
+ }
+
+ protected Twitter getTwitter() {
+ return endpoint.getProperties().getTwitter();
+ }
+
+ protected long getLastId() {
+ return lastId;
+ }
+
+ protected Paging getLastIdPaging() {
+ return new Paging(lastId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
index ced7e3b..9c47259 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
@@ -16,8 +16,7 @@
*/
package org.apache.camel.component.twitter.consumer;
-import java.io.Serializable;
-import java.util.Iterator;
+import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -29,10 +28,9 @@ import org.apache.camel.impl.DefaultConsumer;
*/
public class TwitterConsumerDirect extends DefaultConsumer {
- private Twitter4JConsumer twitter4jConsumer;
+ private final TwitterConsumer twitter4jConsumer;
- public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor,
- Twitter4JConsumer twitter4jConsumer) {
+ public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
super(endpoint, processor);
this.twitter4jConsumer = twitter4jConsumer;
@@ -42,11 +40,9 @@ public class TwitterConsumerDirect extends DefaultConsumer {
protected void doStart() throws Exception {
super.doStart();
- Iterator<? extends Serializable> i = twitter4jConsumer.directConsume().iterator();
- while (i.hasNext()) {
- Exchange e = getEndpoint().createExchange();
- e.getIn().setBody(i.next());
- getProcessor().process(e);
+ List<Exchange> exchanges = twitter4jConsumer.directConsume();
+ for (int i = 0; i < exchanges.size(); i++) {
+ getProcessor().process(exchanges.get(i));
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
index 4feaf8a..f5b94a0 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
@@ -19,15 +19,13 @@ package org.apache.camel.component.twitter.consumer;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
import org.apache.camel.impl.DefaultConsumer;
-import twitter4j.Status;
-public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStatusListener {
- private Twitter4JConsumer twitter4jConsumer;
+public class TwitterConsumerEvent extends DefaultConsumer implements TwitterEventListener {
+ private final TwitterConsumer twitter4jConsumer;
- public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor,
- Twitter4JConsumer twitter4jConsumer) {
+ public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
super(endpoint, processor);
this.twitter4jConsumer = twitter4jConsumer;
}
@@ -35,25 +33,28 @@ public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStat
@Override
protected void doStart() throws Exception {
super.doStart();
- if (twitter4jConsumer instanceof StreamingConsumer) {
- ((StreamingConsumer) twitter4jConsumer).registerTweetListener(this);
- ((StreamingConsumer) twitter4jConsumer).doStart();
+
+ if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+ ((AbstractStreamingConsumer) twitter4jConsumer).setEventListener(this);
+ ((AbstractStreamingConsumer) twitter4jConsumer).start();
}
}
@Override
protected void doStop() throws Exception {
- super.doStop();
- if (twitter4jConsumer instanceof StreamingConsumer) {
- ((StreamingConsumer) twitter4jConsumer).unregisterTweetListener(this);
- ((StreamingConsumer) twitter4jConsumer).doStop();
+ if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+ ((AbstractStreamingConsumer) twitter4jConsumer).removeEventListener(this);
+ ((AbstractStreamingConsumer) twitter4jConsumer).stop();
}
+
+ super.doStop();
}
@Override
- public void onStatus(Status status) {
- Exchange exchange = getEndpoint().createExchange();
- exchange.getIn().setBody(status);
+ public void onEvent(Exchange exchange) {
+ if (!isRunAllowed()) {
+ return;
+ }
try {
getProcessor().process(exchange);
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
index 6236116..364e55a 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
@@ -16,15 +16,12 @@
*/
package org.apache.camel.component.twitter.consumer;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.twitter.TwitterEndpoint;
import org.apache.camel.component.twitter.TwitterEndpointPolling;
-import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
import org.apache.camel.impl.ScheduledPollConsumer;
/**
@@ -33,10 +30,9 @@ import org.apache.camel.impl.ScheduledPollConsumer;
public class TwitterConsumerPolling extends ScheduledPollConsumer {
public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
- private Twitter4JConsumer twitter4jConsumer;
+ private final TwitterConsumer twitter4jConsumer;
- public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor,
- Twitter4JConsumer twitter4jConsumer) {
+ public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
super(endpoint, processor);
setDelay(DEFAULT_CONSUMER_DELAY);
this.twitter4jConsumer = twitter4jConsumer;
@@ -45,31 +41,29 @@ public class TwitterConsumerPolling extends ScheduledPollConsumer {
@Override
protected void doStart() throws Exception {
super.doStart();
- if (twitter4jConsumer instanceof StreamingConsumer) {
- ((StreamingConsumer) twitter4jConsumer).doStart();
+ if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+ ((AbstractStreamingConsumer) twitter4jConsumer).start();
}
}
@Override
protected void doStop() throws Exception {
- super.doStop();
- if (twitter4jConsumer instanceof StreamingConsumer) {
- ((StreamingConsumer) twitter4jConsumer).doStop();
+ if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+ ((AbstractStreamingConsumer) twitter4jConsumer).stop();
}
+
+ super.doStop();
}
+ @Override
protected int poll() throws Exception {
- Iterator<? extends Serializable> i = twitter4jConsumer.pollConsume().iterator();
-
- int total = 0;
- while (i.hasNext()) {
- Exchange e = getEndpoint().createExchange();
- e.getIn().setBody(i.next());
- getProcessor().process(e);
+ List<Exchange> exchanges = twitter4jConsumer.pollConsume();
- total++;
+ int index = 0;
+ for (; index < exchanges.size(); index++) {
+ getProcessor().process(exchanges.get(index));
}
- return total;
+ return index;
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
new file mode 100644
index 0000000..022dbff
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.EventListener;
+
+import org.apache.camel.Exchange;
+
+public interface TwitterEventListener extends EventListener {
+
+ void onEvent(Exchange exchange);
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
new file mode 100644
index 0000000..77600d2
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterConstants;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+
+public enum TwitterEventType {
+ STATUS,
+ DIRECT_MESSAGE,
+ FAVORITE,
+ UNFAVORITE,
+ FOLLOW,
+ UNFOLLOW,
+ USERLIST_MEMBER_ADDITION,
+ USERLIST_MEMBER_DELETION,
+ USERLIST_SUBSCRIPTION,
+ USERLIST_UNSUBSCRIPTION,
+ USERLIST_CREATION,
+ USERLIST_UPDATE,
+ USERLIST_DELETETION,
+ USER_PROFILE_UPDATE,
+ USER_SUSPENSION,
+ USER_DELETION,
+ BLOCK,
+ UNBLOCK,
+ RETWEETED_RETWEET,
+ FAVORITED_RETWEET,
+ QUOTED_TWEET;
+
+ public Exchange createExchange(TwitterEndpoint endpoint) {
+ return createExchange(endpoint, null);
+ }
+
+ public <T> Exchange createExchange(TwitterEndpoint endpoint, T body) {
+ Exchange exchange = endpoint.createExchange();
+ exchange.getIn().setHeader(TwitterConstants.TWITTER_EVENT_TYPE, name());
+
+ if (body != null) {
+ exchange.getIn().setBody(body);
+ }
+
+ return exchange;
+ }
+
+ public <T> List<Exchange> createExchangeList(TwitterEndpoint endpoint, List<T> bodyList) {
+ List<Exchange> exchanges = Collections.emptyList();
+
+ if (bodyList != null && !bodyList.isEmpty()) {
+ exchanges = new ArrayList<>(bodyList.size());
+ for (int i = 0; i < bodyList.size(); i++) {
+ exchanges.add(createExchange(endpoint, bodyList.get(i)));
+ }
+ }
+
+ return exchanges;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
index 500c6e6..d0b89de 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
@@ -18,31 +18,37 @@ package org.apache.camel.component.twitter.consumer.directmessage;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
import twitter4j.DirectMessage;
-import twitter4j.Paging;
import twitter4j.TwitterException;
/**
* Consumes a user's direct messages
*/
-public class DirectMessageConsumer extends Twitter4JConsumer {
+public class DirectMessageConsumer extends TwitterConsumer {
public DirectMessageConsumer(TwitterEndpoint te) {
super(te);
}
- public List<DirectMessage> pollConsume() throws TwitterException {
- List<DirectMessage> list = te.getProperties().getTwitter().getDirectMessages(new Paging(lastId));
- for (DirectMessage dm : list) {
- checkLastId(dm.getId());
+ @Override
+ public List<Exchange> pollConsume() throws TwitterException {
+ List<DirectMessage> directMessages = getTwitter().getDirectMessages(getLastIdPaging());
+ for (int i = 0; i < directMessages.size(); i++) {
+ setLastIdIfGreater(directMessages.get(i).getId());
}
- return list;
+
+ return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, directMessages);
}
- public List<DirectMessage> directConsume() throws TwitterException {
- return te.getProperties().getTwitter().getDirectMessages();
+ @Override
+ public List<Exchange> directConsume() throws TwitterException {
+ return TwitterEventType.DIRECT_MESSAGE.createExchangeList(
+ endpoint,
+ getTwitter().getDirectMessages()
+ );
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
index 4b2cdf1..1c3a087 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.twitter.consumer.search;
import java.util.Collections;
import java.util.List;
+import org.apache.camel.Exchange;
import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +37,7 @@ import twitter4j.TwitterException;
/**
* Consumes search requests
*/
-public class SearchConsumer extends Twitter4JConsumer {
+public class SearchConsumer extends TwitterConsumer {
private static final Logger LOG = LoggerFactory.getLogger(SearchConsumer.class);
@@ -43,8 +45,8 @@ public class SearchConsumer extends Twitter4JConsumer {
super(te);
}
- public List<Status> pollConsume() throws TwitterException {
- String keywords = te.getProperties().getKeywords();
+ public List<Exchange> pollConsume() throws TwitterException {
+ String keywords = endpoint.getProperties().getKeywords();
Query query;
@@ -56,15 +58,15 @@ public class SearchConsumer extends Twitter4JConsumer {
LOG.debug("Searching twitter without keywords.");
}
- if (te.getProperties().isFilterOld()) {
- query.setSinceId(lastId);
+ if (endpoint.getProperties().isFilterOld()) {
+ query.setSinceId(getLastId());
}
return search(query);
}
- public List<Status> directConsume() throws TwitterException {
- String keywords = te.getProperties().getKeywords();
+ public List<Exchange> directConsume() throws TwitterException {
+ String keywords = endpoint.getProperties().getKeywords();
if (keywords == null || keywords.trim().length() == 0) {
return Collections.emptyList();
}
@@ -74,33 +76,33 @@ public class SearchConsumer extends Twitter4JConsumer {
return search(query);
}
- private List<Status> search(Query query) throws TwitterException {
+ private List<Exchange> search(Query query) throws TwitterException {
Integer numberOfPages = 1;
- if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) {
- query.setLang(te.getProperties().getLang());
+ if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) {
+ query.setLang(endpoint.getProperties().getLang());
}
- if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) {
- query.setCount(te.getProperties().getCount());
+ if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) {
+ query.setCount(endpoint.getProperties().getCount());
}
- if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) {
- numberOfPages = te.getProperties().getNumberOfPages();
+ if (ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) {
+ numberOfPages = endpoint.getProperties().getNumberOfPages();
}
- if (ObjectHelper.isNotEmpty(te.getProperties().getLatitude())
- && ObjectHelper.isNotEmpty(te.getProperties().getLongitude())
- && ObjectHelper.isNotEmpty(te.getProperties().getRadius())) {
- GeoLocation location = new GeoLocation(te.getProperties().getLatitude(), te.getProperties().getLongitude());
- query.setGeoCode(location, te.getProperties().getRadius(), Unit.valueOf(te.getProperties().getDistanceMetric()));
+ if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude())
+ && ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude())
+ && ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) {
+ GeoLocation location = new GeoLocation(endpoint.getProperties().getLatitude(), endpoint.getProperties().getLongitude());
+ query.setGeoCode(location, endpoint.getProperties().getRadius(), Unit.valueOf(endpoint.getProperties().getDistanceMetric()));
LOG.debug("Searching with additional geolocation parameters.");
}
LOG.debug("Searching with {} pages.", numberOfPages);
- Twitter twitter = te.getProperties().getTwitter();
+ Twitter twitter = getTwitter();
QueryResult qr = twitter.search(query);
List<Status> tweets = qr.getTweets();
@@ -113,13 +115,13 @@ public class SearchConsumer extends Twitter4JConsumer {
tweets.addAll(qr.getTweets());
}
- if (te.getProperties().isFilterOld()) {
- for (Status t : tweets) {
- checkLastId(t.getId());
+ if (endpoint.getProperties().isFilterOld()) {
+ for (int i = 0; i < tweets.size(); i++) {
+ setLastIdIfGreater(tweets.get(i).getId());
}
}
- return tweets;
+ return TwitterEventType.STATUS.createExchangeList(endpoint, tweets);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
new file mode 100644
index 0000000..7a7bfcb
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventListener;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterException;
+import twitter4j.TwitterStream;
+
+/**
+ * Super class providing consuming capabilities for the streaming API.
+ */
+public abstract class AbstractStreamingConsumer extends TwitterConsumer implements StatusListener, Service {
+ private final TwitterStream twitterStream;
+ private final List<Exchange> receivedStatuses;
+ private final AtomicReference<TwitterEventListener> twitterEventListener;
+ private boolean clear;
+
+ public AbstractStreamingConsumer(TwitterEndpoint te) {
+ super(te);
+ this.receivedStatuses = new ArrayList<>();
+ this.twitterStream = te.getProperties().createTwitterStream();
+ this.twitterStream.addListener(this);
+ this.twitterEventListener = new AtomicReference<>();
+ this.clear = true;
+ }
+
+ @Override
+ public List<Exchange> pollConsume() throws TwitterException {
+ List<Exchange> result;
+
+ synchronized (receivedStatuses) {
+ clear = true;
+ result = Collections.unmodifiableList(new ArrayList<>(receivedStatuses));
+ }
+
+ return result;
+ }
+
+ @Override
+ public List<Exchange> directConsume() throws TwitterException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void onException(Exception ex) {
+ }
+
+ @Override
+ public void onStatus(Status status) {
+ onEvent(TwitterEventType.STATUS.createExchange(endpoint, status));
+ }
+
+ @Override
+ public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+ // noop
+ }
+
+ @Override
+ public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+ // noop
+ }
+
+ @Override
+ public void onScrubGeo(long userId, long upToStatusId) {
+ // noop
+ }
+
+ public void setEventListener(TwitterEventListener tweeterStatusListener) {
+ twitterEventListener.set(tweeterStatusListener);
+ }
+
+ public void removeEventListener(TwitterEventListener tweeterStatusListener) {
+ twitterEventListener.compareAndSet(tweeterStatusListener, null);
+ }
+
+ @Override
+ public void stop() {
+ twitterStream.removeListener(this);
+ twitterStream.shutdown();
+ twitterStream.cleanUp();
+ }
+
+ protected TwitterStream getTwitterStream() {
+ return twitterStream;
+ }
+
+ protected void onEvent(Exchange exchange) {
+ TwitterEventListener listener = twitterEventListener.get();
+ if (listener != null) {
+ listener.onEvent(exchange);
+ } else {
+ synchronized (receivedStatuses) {
+ if (clear) {
+ receivedStatuses.clear();
+ clear = false;
+ }
+ receivedStatuses.add(exchange);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
deleted file mode 100644
index 588a5e1..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the filter stream
- */
-public class FilterConsumer extends StreamingConsumer {
-
- public FilterConsumer(TwitterEndpoint te) {
- super(te);
- }
-
- @Override
- protected void startStreaming() {
- twitterStream.filter(createFilter(te));
- }
-
- @Override
- public void onStallWarning(StallWarning stallWarning) {
- // noop
- }
-
- private FilterQuery createFilter(TwitterEndpoint te) {
- FilterQuery filterQuery = new FilterQuery();
- String allLocationsString = te.getProperties().getLocations();
- if (allLocationsString != null) {
- String[] locationStrings = allLocationsString.split(";");
- double[][] locations = new double[locationStrings.length][2];
- for (int i = 0; i < locationStrings.length; i++) {
- String[] coords = locationStrings[i].split(",");
- locations[i][0] = Double.valueOf(coords[0]);
- locations[i][1] = Double.valueOf(coords[1]);
- }
- filterQuery.locations(locations);
- }
-
- String keywords = te.getProperties().getKeywords();
- if (keywords != null && keywords.length() > 0) {
- filterQuery.track(keywords.split(","));
- }
-
- String userIds = te.getProperties().getUserIds();
- if (userIds != null) {
- String[] stringUserIds = userIds.split(",");
- long[] longUserIds = new long[stringUserIds.length];
- for (int i = 0; i < stringUserIds.length; i++) {
- longUserIds[i] = Long.valueOf(stringUserIds[i]);
- }
- filterQuery.follow(longUserIds);
- }
-
- if (allLocationsString == null && keywords == null && userIds == null) {
- throw new IllegalArgumentException("At least one filter parameter is required");
- }
-
- return filterQuery;
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
new file mode 100644
index 0000000..62460cc
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+
+/**
+ * Consumes the filter stream
+ */
+public class FilterStreamingConsumer extends AbstractStreamingConsumer {
+
+ public FilterStreamingConsumer(TwitterEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void start() {
+ getTwitterStream().filter(createFilter());
+ }
+
+ @Override
+ public void onStallWarning(StallWarning stallWarning) {
+ // noop
+ }
+
+ private FilterQuery createFilter() {
+ FilterQuery filterQuery = new FilterQuery();
+ String allLocationsString = endpoint.getProperties().getLocations();
+ if (allLocationsString != null) {
+ String[] locationStrings = allLocationsString.split(";");
+ double[][] locations = new double[locationStrings.length][2];
+ for (int i = 0; i < locationStrings.length; i++) {
+ String[] coords = locationStrings[i].split(",");
+ locations[i][0] = Double.valueOf(coords[0]);
+ locations[i][1] = Double.valueOf(coords[1]);
+ }
+ filterQuery.locations(locations);
+ }
+
+ String keywords = endpoint.getProperties().getKeywords();
+ if (keywords != null && keywords.length() > 0) {
+ filterQuery.track(keywords.split(","));
+ }
+
+ String userIds = endpoint.getProperties().getUserIds();
+ if (userIds != null) {
+ String[] stringUserIds = userIds.split(",");
+ long[] longUserIds = new long[stringUserIds.length];
+ for (int i = 0; i < stringUserIds.length; i++) {
+ longUserIds[i] = Long.valueOf(stringUserIds[i]);
+ }
+ filterQuery.follow(longUserIds);
+ }
+
+ if (allLocationsString == null && keywords == null && userIds == null) {
+ throw new IllegalArgumentException("At least one filter parameter is required");
+ }
+
+ return filterQuery;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
deleted file mode 100644
index 89cc157..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the sample stream
- */
-public class SampleConsumer extends StreamingConsumer {
-
- public SampleConsumer(TwitterEndpoint te) {
- super(te);
- }
-
- @Override
- protected void startStreaming() {
- twitterStream.sample();
- }
-
- @Override
- public void onStallWarning(StallWarning stallWarning) {
- // noop
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
new file mode 100644
index 0000000..e0a5b27
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.StallWarning;
+
+/**
+ * Consumes the sample stream
+ */
+public class SampleStreamingConsumer extends AbstractStreamingConsumer {
+
+ public SampleStreamingConsumer(TwitterEndpoint endpoint) {
+ super(endpoint);
+ }
+
+ @Override
+ public void start() {
+ getTwitterStream().sample();
+ }
+
+ @Override
+ public void onStallWarning(StallWarning stallWarning) {
+ // noop
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
deleted file mode 100644
index abb59cd..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TweeterStatusListener;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterException;
-import twitter4j.TwitterStream;
-
-/**
- * Super class providing consuming capabilities for the streaming API.
- */
-public abstract class StreamingConsumer extends Twitter4JConsumer implements StatusListener {
- protected final TwitterStream twitterStream;
- private final List<Status> receivedStatuses = new ArrayList<Status>();
- private volatile boolean clear;
- private TweeterStatusListener tweeterStatusListener;
-
- public StreamingConsumer(TwitterEndpoint te) {
- super(te);
- twitterStream = te.getProperties().createTwitterStream();
- twitterStream.addListener(this);
- }
-
- public List<Status> pollConsume() throws TwitterException {
- clear = true;
- return Collections.unmodifiableList(new ArrayList<Status>(receivedStatuses));
- }
-
- public List<Status> directConsume() throws TwitterException {
- // not used
- return null;
- }
-
- @Override
- public void onException(Exception ex) {
- }
-
- @Override
- public void onStatus(Status status) {
- if (tweeterStatusListener != null) {
- tweeterStatusListener.onStatus(status);
- } else {
- if (clear) {
- receivedStatuses.clear();
- clear = false;
- }
- receivedStatuses.add(status);
- }
- }
-
- @Override
- public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
- // noop
- }
-
- @Override
- public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
- // noop
- }
-
- @Override
- public void onScrubGeo(long userId, long upToStatusId) {
- // noop
- }
-
- public void registerTweetListener(TweeterStatusListener tweeterStatusListener) {
- this.tweeterStatusListener = tweeterStatusListener;
- }
-
- public void unregisterTweetListener(TweeterStatusListener tweeterStatusListener) {
- this.tweeterStatusListener = null;
- }
-
- public void doStart() {
- startStreaming();
- }
-
- public void doStop() {
- twitterStream.shutdown();
- }
-
- protected abstract void startStreaming();
-
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
index 751d9e9..e04ba31 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
@@ -16,7 +16,11 @@
*/
package org.apache.camel.component.twitter.consumer.streaming;
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterConstants;
import org.apache.camel.component.twitter.TwitterEndpoint;
+import org.apache.camel.component.twitter.TwitterHelper;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
import twitter4j.DirectMessage;
import twitter4j.StallWarning;
import twitter4j.Status;
@@ -24,110 +28,168 @@ import twitter4j.User;
import twitter4j.UserList;
import twitter4j.UserStreamListener;
-public class UserStreamingConsumer extends StreamingConsumer implements UserStreamListener {
+public class UserStreamingConsumer extends AbstractStreamingConsumer implements UserStreamListener {
- public UserStreamingConsumer(TwitterEndpoint te) {
- super(te);
+ public UserStreamingConsumer(TwitterEndpoint endpoint) {
+ super(endpoint);
}
@Override
- protected void startStreaming() {
- twitterStream.user();
+ public void start() {
+ getTwitterStream().user();
}
@Override
- public void onDeletionNotice(long l, long l2) {
+ public void onDeletionNotice(long directMessageId, long userId) {
// noop
}
@Override
- public void onFriendList(long[] longs) {
+ public void onFriendList(long[] friendIds) {
// noop
}
@Override
- public void onFavorite(User user, User user2, Status status) {
- // noop
+ public void onFavorite(User source, User target, Status favoritedStatus) {
+ Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, favoritedStatus);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+ onEvent(exchange);
}
@Override
- public void onUnfavorite(User user, User user2, Status status) {
- // noop
+ public void onUnfavorite(User source, User target, Status unfavoritedStatus) {
+ Exchange exchange = TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+ onEvent(exchange);
}
@Override
- public void onFollow(User user, User user2) {
- // noop
+ public void onFollow(User source, User followedUser) {
+ Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed");
+
+ onEvent(exchange);
}
@Override
- public void onUnfollow(User user, User user2) {
- // noop
+ public void onUnfollow(User source, User unfollowedUser) {
+ Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed");
+
+ onEvent(exchange);
}
@Override
public void onDirectMessage(DirectMessage directMessage) {
- // noop
+ onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, directMessage));
}
@Override
- public void onUserListMemberAddition(User user, User user2, UserList userList) {
- // noop
+ public void onUserListMemberAddition(User addedMember, User listOwner, UserList list) {
+ Exchange exchange = TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list);
+ TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember");
+ TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+ onEvent(exchange);
}
@Override
- public void onUserListMemberDeletion(User user, User user2, UserList userList) {
- // noop
+ public void onUserListMemberDeletion(User deletedMember, User listOwner, UserList list) {
+ Exchange exchange = TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list);
+ TwitterHelper.setUserHeader(exchange, 1, deletedMember, "deletedMember");
+ TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+ onEvent(exchange);
}
@Override
- public void onUserListSubscription(User user, User user2, UserList userList) {
- // noop
+ public void onUserListSubscription(User subscriber, User listOwner, UserList list) {
+ Exchange exchange = TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list);
+ TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
+ TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+ onEvent(exchange);
}
@Override
- public void onUserListUnsubscription(User user, User user2, UserList userList) {
- // noop
+ public void onUserListUnsubscription(User subscriber, User listOwner, UserList list) {
+ Exchange exchange = TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list);
+ TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
+ TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+ onEvent(exchange);
}
@Override
public void onUserListCreation(User user, UserList userList) {
- // noop
+ Exchange exchange = TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList);
+ TwitterHelper.setUserHeader(exchange, user);
+
+ onEvent(exchange);
}
@Override
public void onUserListUpdate(User user, UserList userList) {
- // noop
+ Exchange exchange = TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList);
+ TwitterHelper.setUserHeader(exchange, user);
+
+ onEvent(exchange);
}
@Override
public void onUserListDeletion(User user, UserList userList) {
- // noop
+ Exchange exchange = TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList);
+ TwitterHelper.setUserHeader(exchange, user);
+
+ onEvent(exchange);
}
@Override
public void onUserProfileUpdate(User user) {
- // noop
+ Exchange exchange = TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint);
+ TwitterHelper.setUserHeader(exchange, user);
+
+ onEvent(exchange);
}
@Override
- public void onUserSuspension(long l) {
- // noop
+ public void onUserSuspension(long suspendedUser) {
+ Exchange exchange = TwitterEventType.USER_SUSPENSION.createExchange(endpoint);
+ exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, suspendedUser);
+
+ onEvent(exchange);
}
@Override
- public void onUserDeletion(long l) {
- // noop
+ public void onUserDeletion(long deletedUser) {
+ Exchange exchange = TwitterEventType.USER_DELETION.createExchange(endpoint);
+ exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser);
+
+ onEvent(exchange);
}
@Override
- public void onBlock(User user, User user2) {
- // noop
+ public void onBlock(User source, User blockedUser) {
+ Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked");
+
+ onEvent(exchange);
}
@Override
- public void onUnblock(User user, User user2) {
- // noop
+ public void onUnblock(User source, User unblockedUser) {
+ Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked");
+
+ onEvent(exchange);
}
@Override
@@ -137,20 +199,28 @@ public class UserStreamingConsumer extends StreamingConsumer implements UserStre
@Override
public void onRetweetedRetweet(User source, User target, Status retweetedStatus) {
- // TODO Auto-generated method stub
-
+ Exchange exchange = TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+ onEvent(exchange);
}
@Override
public void onFavoritedRetweet(User source, User target, Status favoritedRetweeet) {
- // TODO Auto-generated method stub
-
+ Exchange exchange = TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+ onEvent(exchange);
}
@Override
public void onQuotedTweet(User source, User target, Status quotingTweet) {
- // TODO Auto-generated method stub
-
- }
+ Exchange exchange = TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet);
+ TwitterHelper.setUserHeader(exchange, 1, source, "source");
+ TwitterHelper.setUserHeader(exchange, 2, target, "target");
+ onEvent(exchange);
+ }
}