You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2016/02/29 17:38:53 UTC

[2/3] camel git commit: CAMEL-8258 - Support Streaming from User Endpoint including Direct Messages

CAMEL-8258 - Support Streaming from User Endpoint including Direct Messages


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/33b3bd5c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/33b3bd5c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/33b3bd5c

Branch: refs/heads/master
Commit: 33b3bd5c9a011788787c094e83848ca666a52724
Parents: b09eca2
Author: lburgazzoli <lb...@gmail.com>
Authored: Fri Feb 26 12:36:21 2016 +0100
Committer: Andrea Cosentino <an...@gmail.com>
Committed: Mon Feb 29 17:35:39 2016 +0100

----------------------------------------------------------------------
 .../component/twitter/Twitter4JFactory.java     | 173 -------------------
 .../component/twitter/TwitterConstants.java     |  25 ++-
 .../twitter/TwitterEndpointDirect.java          |   6 +-
 .../component/twitter/TwitterEndpointEvent.java |   4 +-
 .../twitter/TwitterEndpointPolling.java         |   8 +-
 .../camel/component/twitter/TwitterHelper.java  | 172 ++++++++++++++++++
 .../twitter/consumer/TweeterStatusListener.java |  24 ---
 .../twitter/consumer/Twitter4JConsumer.java     |  72 --------
 .../twitter/consumer/TwitterConsumer.java       |  86 +++++++++
 .../twitter/consumer/TwitterConsumerDirect.java |  16 +-
 .../twitter/consumer/TwitterConsumerEvent.java  |  33 ++--
 .../consumer/TwitterConsumerPolling.java        |  38 ++--
 .../twitter/consumer/TwitterEventListener.java  |  26 +++
 .../twitter/consumer/TwitterEventType.java      |  77 +++++++++
 .../directmessage/DirectMessageConsumer.java    |  28 +--
 .../twitter/consumer/search/SearchConsumer.java |  52 +++---
 .../streaming/AbstractStreamingConsumer.java    | 128 ++++++++++++++
 .../consumer/streaming/FilterConsumer.java      |  77 ---------
 .../streaming/FilterStreamingConsumer.java      |  77 +++++++++
 .../consumer/streaming/SampleConsumer.java      |  40 -----
 .../streaming/SampleStreamingConsumer.java      |  40 +++++
 .../consumer/streaming/StreamingConsumer.java   | 108 ------------
 .../streaming/UserStreamingConsumer.java        | 156 ++++++++++++-----
 .../timeline/AbstractStatusConsumer.java        |  55 ++++++
 .../twitter/consumer/timeline/HomeConsumer.java |  23 +--
 .../consumer/timeline/MentionsConsumer.java     |  23 +--
 .../consumer/timeline/RetweetsConsumer.java     |  23 +--
 .../twitter/consumer/timeline/UserConsumer.java |  23 +--
 .../component/twitter/data/ConsumerType.java    |  12 +-
 .../component/twitter/data/EndpointType.java    |  12 +-
 .../component/twitter/data/StreamingType.java   |  11 +-
 .../component/twitter/data/TimelineType.java    |  11 +-
 .../component/twitter/data/TrendsType.java      |  11 +-
 .../twitter/producer/DirectMessageProducer.java |  10 +-
 .../twitter/producer/SearchProducer.java        |  24 +--
 .../twitter/producer/Twitter4JProducer.java     |  36 ----
 .../twitter/producer/TwitterProducer.java       |  37 ++++
 .../twitter/producer/UserProducer.java          |  14 +-
 .../twitter/util/TwitterConverter.java          |  28 ++-
 .../twitter/CamelTwitterTestSupport.java        |  65 ++++---
 .../component/twitter/UserStreamingTest.java    |  49 ++++++
 .../src/test/resources/log4j.properties         |   1 +
 42 files changed, 1112 insertions(+), 822 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
deleted file mode 100644
index 9025164..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/Twitter4JFactory.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter;
-
-import java.util.regex.Pattern;
-
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer;
-import org.apache.camel.component.twitter.consumer.search.SearchConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.FilterConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.SampleConsumer;
-import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer;
-import org.apache.camel.component.twitter.consumer.timeline.UserConsumer;
-import org.apache.camel.component.twitter.data.ConsumerType;
-import org.apache.camel.component.twitter.data.StreamingType;
-import org.apache.camel.component.twitter.data.TimelineType;
-import org.apache.camel.component.twitter.producer.DirectMessageProducer;
-import org.apache.camel.component.twitter.producer.SearchProducer;
-import org.apache.camel.component.twitter.producer.UserProducer;
-import org.apache.camel.impl.DefaultProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Maps the endpoint URI to the respective Twitter4J consumer or producer.
- * <p/>
- * URI STRUCTURE:
- * <p/>
- * timeline/
- * public
- * home
- * friends
- * user (ALSO A PRODUCER)
- * mentions
- * retweetsofme
- * user/
- * search users (DIRECT ONLY)
- * user suggestions (DIRECT ONLY)
- * trends/
- * daily
- * weekly
- * userlist
- * directmessage (ALSO A PRODUCER)
- * streaming/
- * filter (POLLING ONLY)
- * sample (POLLING ONLY)
- * user (POLLING ONLY)
- */
-public final class Twitter4JFactory {
-
-    private static final Logger LOG = LoggerFactory.getLogger(Twitter4JFactory.class);
-
-    private Twitter4JFactory() {
-        // helper class
-    }
-
-    public static Twitter4JConsumer getConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
-        String[] uriSplit = splitUri(uri);
-
-        if (uriSplit.length > 0) {
-            switch (ConsumerType.fromUri(uriSplit[0])) {
-            case DIRECTMESSAGE:
-                return new DirectMessageConsumer(te);
-            case SEARCH:
-                boolean hasNoKeywords = te.getProperties().getKeywords() == null
-                        || te.getProperties().getKeywords().trim().isEmpty();
-                if (hasNoKeywords) {
-                    throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided.");
-                } else {
-                    return new SearchConsumer(te);
-                }
-            case STREAMING:
-                switch (StreamingType.fromUri(uriSplit[1])) {
-                case SAMPLE:
-                    return new SampleConsumer(te);
-                case FILTER:
-                    return new FilterConsumer(te);
-                case USER:
-                    return new UserStreamingConsumer(te);
-                default:
-                    break;
-                }
-                break;
-            case TIMELINE:
-                if (uriSplit.length > 1) {
-                    switch (TimelineType.fromUri(uriSplit[1])) {
-                    case HOME:
-                        return new HomeConsumer(te);
-                    case MENTIONS:
-                        return new MentionsConsumer(te);
-                    case RETWEETSOFME:
-                        return new RetweetsConsumer(te);
-                    case USER:
-                        if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
-                            throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set.");
-                        } else {
-                            return new UserConsumer(te);
-                        }
-                    default:
-                        break;
-                    }
-                }
-                break;
-            default:
-                break;
-            }
-        }
-
-        throw new IllegalArgumentException("Cannot create any consumer with uri " + uri
-                + ". A consumer type was not provided (or an incorrect pairing was used).");
-    }
-
-    public static DefaultProducer getProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
-        String[] uriSplit = splitUri(uri);
-
-        if (uriSplit.length > 0) {
-            switch (ConsumerType.fromUri(uriSplit[0])) {
-            case DIRECTMESSAGE:
-                if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
-                    throw new IllegalArgumentException(
-                            "Producer type set to DIRECT MESSAGE but no recipient user was set.");
-                } else {
-                    return new DirectMessageProducer(te);
-                }
-            case TIMELINE:
-                if (uriSplit.length > 1) {
-                    switch (TimelineType.fromUri(uriSplit[1])) {
-                    case USER:
-                        return new UserProducer(te);
-                    default:
-                        break;
-                    }
-                }
-                break;
-            case SEARCH:
-                return new SearchProducer(te);
-            default:
-                break;
-            }
-
-        }
-
-        throw new IllegalArgumentException("Cannot create any producer with uri " + uri
-                + ". A producer type was not provided (or an incorrect pairing was used).");
-    }
-
-    private static String[] splitUri(String uri) {
-        Pattern p1 = Pattern.compile("twitter:(//)*");
-        Pattern p2 = Pattern.compile("\\?.*");
-
-        uri = p1.matcher(uri).replaceAll("");
-        uri = p2.matcher(uri).replaceAll("");
-
-        return uri.split("/");
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
index a6e845b..fd9bf27 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConstants.java
@@ -19,19 +19,14 @@ package org.apache.camel.component.twitter;
 /**
  * Defines common constants
  */
-public final class TwitterConstants {
-
-    public static final String TWITTER_KEYWORDS = "CamelTwitterKeywords";
-
-    public static final String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage";
-    public static final String TWITTER_COUNT = "CamelTwitterCount";
-    public static final String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages";    
-    public static final String TWITTER_SINCEID = "CamelTwitterSinceId";
-    public static final String TWITTER_MAXID = "CamelTwitterMaxId";
-    public static final String TWITTER_USER = "CamelTwitterUser";
-
-    private TwitterConstants() {
-        // utility
-    }
-
+public interface TwitterConstants {
+    String TWITTER_KEYWORDS = "CamelTwitterKeywords";
+    String TWITTER_SEARCH_LANGUAGE = "CamelTwitterSearchLanguage";
+    String TWITTER_COUNT = "CamelTwitterCount";
+    String TWITTER_NUMBER_OF_PAGES = "CamelTwitterNumberOfPages";    
+    String TWITTER_SINCEID = "CamelTwitterSinceId";
+    String TWITTER_MAXID = "CamelTwitterMaxId";
+    String TWITTER_USER = "CamelTwitterUser";
+    String TWITTER_USER_ROLE = "CamelTwitterUserRole";
+    String TWITTER_EVENT_TYPE = "CamelTwitterEventType";
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
index bc5b350..a3f2254 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
@@ -23,7 +23,7 @@ import org.apache.camel.Producer;
 import org.apache.camel.ServiceStatus;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.component.direct.DirectEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
 import org.apache.camel.component.twitter.consumer.TwitterConsumerDirect;
 import org.apache.camel.component.twitter.data.EndpointType;
 
@@ -43,7 +43,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+        TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
         TwitterConsumerDirect answer = new TwitterConsumerDirect(this, processor, twitter4jConsumer);
         configureConsumer(answer);
         return answer;
@@ -51,7 +51,7 @@ public class TwitterEndpointDirect extends DirectEndpoint implements TwitterEndp
 
     @Override
     public Producer createProducer() throws Exception {
-        return Twitter4JFactory.getProducer(this, getEndpointUri());
+        return TwitterHelper.createProducer(this, getEndpointUri());
     }
 
     @ManagedAttribute

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
index 0690664..0c49e17 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.twitter;
 import org.apache.camel.Consumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
 import org.apache.camel.component.twitter.consumer.TwitterConsumerEvent;
 import org.apache.camel.component.twitter.data.EndpointType;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -36,7 +36,7 @@ public class TwitterEndpointEvent extends DefaultEndpoint implements TwitterEndp
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+        TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
         return new TwitterConsumerEvent(this, processor, twitter4jConsumer);
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
index 250c33a..8c17d1e 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
@@ -21,7 +21,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.api.management.ManagedAttribute;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
 import org.apache.camel.component.twitter.consumer.TwitterConsumerPolling;
 import org.apache.camel.component.twitter.data.EndpointType;
 import org.apache.camel.impl.DefaultPollingEndpoint;
@@ -32,7 +32,7 @@ import org.apache.camel.spi.UriParam;
  * This component integrates with Twitter to send tweets or search for tweets and more.
  */
 @ManagedResource(description = "Managed Twitter Endpoint")
-@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = Twitter4JConsumer.class, label = "api,social")
+@UriEndpoint(scheme = "twitter", title = "Twitter", syntax = "twitter:kind", consumerClass = TwitterConsumer.class, label = "api,social")
 public class TwitterEndpointPolling extends DefaultPollingEndpoint implements TwitterEndpoint {
 
     @UriParam(optionalPrefix = "consumer.", defaultValue = "" + TwitterConsumerPolling.DEFAULT_CONSUMER_DELAY, label = "consumer,scheduler",
@@ -49,7 +49,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        Twitter4JConsumer twitter4jConsumer = Twitter4JFactory.getConsumer(this, getEndpointUri());
+        TwitterConsumer twitter4jConsumer = TwitterHelper.createConsumer(this, getEndpointUri());
         // update the pulling lastID with sinceId
         twitter4jConsumer.setLastId(properties.getSinceId());
         TwitterConsumerPolling tc = new TwitterConsumerPolling(this, processor, twitter4jConsumer);
@@ -59,7 +59,7 @@ public class TwitterEndpointPolling extends DefaultPollingEndpoint implements Tw
 
     @Override
     public Producer createProducer() throws Exception {
-        return Twitter4JFactory.getProducer(this, getEndpointUri());
+        return TwitterHelper.createProducer(this, getEndpointUri());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
new file mode 100644
index 0000000..de503a5
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterHelper.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter;
+
+import java.util.regex.Pattern;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.directmessage.DirectMessageConsumer;
+import org.apache.camel.component.twitter.consumer.search.SearchConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.FilterStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.SampleStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.UserStreamingConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.HomeConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.MentionsConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.RetweetsConsumer;
+import org.apache.camel.component.twitter.consumer.timeline.UserConsumer;
+import org.apache.camel.component.twitter.data.ConsumerType;
+import org.apache.camel.component.twitter.data.StreamingType;
+import org.apache.camel.component.twitter.data.TimelineType;
+import org.apache.camel.component.twitter.producer.DirectMessageProducer;
+import org.apache.camel.component.twitter.producer.SearchProducer;
+import org.apache.camel.component.twitter.producer.TwitterProducer;
+import org.apache.camel.component.twitter.producer.UserProducer;
+import twitter4j.User;
+
+public final class TwitterHelper {
+    private TwitterHelper() {
+    }
+
+    public static void setUserHeader(Exchange exchange, User user) {
+        setUserHeader(exchange.getIn(), user);
+    }
+
+    public static void setUserHeader(Message message, User user) {
+        message.setHeader(TwitterConstants.TWITTER_USER, user);
+    }
+
+    public static void setUserHeader(Exchange exchange, int index, User user, String role) {
+        setUserHeader(exchange.getIn(), index, user, role);
+    }
+
+    public static void setUserHeader(Message message, int index, User user, String role) {
+        message.setHeader(TwitterConstants.TWITTER_USER + index, user);
+        message.setHeader(TwitterConstants.TWITTER_USER_ROLE + index, role);
+    }
+
+    public static TwitterConsumer createConsumer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
+        String[] uriSplit = splitUri(uri);
+
+        if (uriSplit.length > 0) {
+            switch (ConsumerType.fromUri(uriSplit[0])) {
+            case DIRECTMESSAGE:
+                return new DirectMessageConsumer(te);
+            case SEARCH:
+                boolean hasNoKeywords = te.getProperties().getKeywords() == null
+                    || te.getProperties().getKeywords().trim().isEmpty();
+                if (hasNoKeywords) {
+                    throw new IllegalArgumentException("Type set to SEARCH but no keywords were provided.");
+                } else {
+                    return new SearchConsumer(te);
+                }
+            case STREAMING:
+                switch (StreamingType.fromUri(uriSplit[1])) {
+                case SAMPLE:
+                    return new SampleStreamingConsumer(te);
+                case FILTER:
+                    return new FilterStreamingConsumer(te);
+                case USER:
+                    return new UserStreamingConsumer(te);
+                default:
+                    break;
+                }
+                break;
+            case TIMELINE:
+                if (uriSplit.length > 1) {
+                    switch (TimelineType.fromUri(uriSplit[1])) {
+                    case HOME:
+                        return new HomeConsumer(te);
+                    case MENTIONS:
+                        return new MentionsConsumer(te);
+                    case RETWEETSOFME:
+                        return new RetweetsConsumer(te);
+                    case USER:
+                        if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
+                            throw new IllegalArgumentException("Fetch type set to USER TIMELINE but no user was set.");
+                        } else {
+                            return new UserConsumer(te);
+                        }
+                    default:
+                        break;
+                    }
+                }
+                break;
+            default:
+                break;
+            }
+        }
+
+        throw new IllegalArgumentException("Cannot create any consumer with uri " + uri
+            + ". A consumer type was not provided (or an incorrect pairing was used).");
+    }
+
+    public static TwitterProducer createProducer(TwitterEndpoint te, String uri) throws IllegalArgumentException {
+        String[] uriSplit = splitUri(uri);
+
+        if (uriSplit.length > 0) {
+            switch (ConsumerType.fromUri(uriSplit[0])) {
+            case DIRECTMESSAGE:
+                if (te.getProperties().getUser() == null || te.getProperties().getUser().trim().isEmpty()) {
+                    throw new IllegalArgumentException(
+                        "Producer type set to DIRECT MESSAGE but no recipient user was set.");
+                } else {
+                    return new DirectMessageProducer(te);
+                }
+            case TIMELINE:
+                if (uriSplit.length > 1) {
+                    switch (TimelineType.fromUri(uriSplit[1])) {
+                    case USER:
+                        return new UserProducer(te);
+                    default:
+                        break;
+                    }
+                }
+                break;
+            case SEARCH:
+                return new SearchProducer(te);
+            default:
+                break;
+            }
+
+        }
+
+        throw new IllegalArgumentException("Cannot create any producer with uri " + uri
+            + ". A producer type was not provided (or an incorrect pairing was used).");
+    }
+
+    private static String[] splitUri(String uri) {
+        Pattern p1 = Pattern.compile("twitter:(//)*");
+        Pattern p2 = Pattern.compile("\\?.*");
+
+        uri = p1.matcher(uri).replaceAll("");
+        uri = p2.matcher(uri).replaceAll("");
+
+        return uri.split("/");
+    }
+
+    public static <T extends Enum<T>> T enumFromString(T[] values, String uri, T defaultValue) {
+        for (int i = values.length - 1; i >= 0; i--) {
+            if (values[i].name().equalsIgnoreCase(uri)) {
+                return values[i];
+            }
+        }
+
+        return defaultValue;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
deleted file mode 100644
index 98830f8..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TweeterStatusListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer;
-
-import twitter4j.Status;
-
-public interface TweeterStatusListener {
-
-    void onStatus(Status status);
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
deleted file mode 100644
index 14ecae3..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/Twitter4JConsumer.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer;
-
-import java.io.Serializable;
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-
-import twitter4j.TwitterException;
-
-
-public abstract class Twitter4JConsumer {
-
-    /**
-     * Instance of TwitterEndpoint.
-     */
-    protected TwitterEndpoint te;
-
-    /**
-     * The last tweet ID received.
-     */
-    protected long lastId = 1;
-
-    protected Twitter4JConsumer(TwitterEndpoint te) {
-        this.te = te;
-    }
-
-    /**
-     * Can't assume that the end of the list will be the most recent ID.
-     * The Twitter API sometimes returns them slightly out of order.
-     */
-    protected void checkLastId(long newId) {
-        if (newId > lastId) {
-            lastId = newId;
-        }
-    }
-
-    /**
-     * Called by polling consumers during each poll.  It needs to be separate
-     * from directConsume() since, as an example, streaming API polling allows
-     * tweets to build up between polls.
-     */
-    public abstract List<? extends Serializable> pollConsume() throws TwitterException;
-
-    /**
-     * Called by direct consumers.
-     */
-    public abstract List<? extends Serializable> directConsume() throws TwitterException;
-    
-    /**
-     * Support to update the Consumer's lastId when starting the consumer
-     * @param sinceId
-     */
-    public void setLastId(long sinceId) {
-        lastId = sinceId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
new file mode 100644
index 0000000..b1d9b37
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumer.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.Paging;
+import twitter4j.Twitter;
+import twitter4j.TwitterException;
+
+
+public abstract class TwitterConsumer {
+
+    /**
+     * Instance of TwitterEndpoint.
+     */
+    protected final TwitterEndpoint endpoint;
+
+    /**
+     * The last tweet ID received.
+     */
+    private long lastId;
+
+    protected TwitterConsumer(TwitterEndpoint endpoint) {
+        this.endpoint = endpoint;
+        this.lastId = -1;
+    }
+
+    /**
+     * Called by polling consumers during each poll.  It needs to be separate
+     * from directConsume() since, as an example, streaming API polling allows
+     * tweets to build up between polls.
+     */
+    public abstract List<Exchange> pollConsume() throws TwitterException;
+
+    /**
+     * Called by direct consumers.
+     */
+    public abstract List<Exchange> directConsume() throws TwitterException;
+
+    /**
+     * Can't assume that the end of the list will be the most recent ID.
+     * The Twitter API sometimes returns them slightly out of order.
+     */
+    protected void setLastIdIfGreater(long newId) {
+        if (newId > lastId) {
+            lastId = newId;
+        }
+    }
+
+    /**
+     * Support to update the Consumer's lastId when starting the consumer
+     * @param sinceId
+     */
+    public void setLastId(long sinceId) {
+        lastId = sinceId;
+    }
+
+    protected Twitter getTwitter() {
+        return endpoint.getProperties().getTwitter();
+    }
+
+    protected long getLastId() {
+        return lastId;
+    }
+
+    protected Paging getLastIdPaging() {
+        return new Paging(lastId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
index ced7e3b..9c47259 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerDirect.java
@@ -16,8 +16,7 @@
  */
 package org.apache.camel.component.twitter.consumer;
 
-import java.io.Serializable;
-import java.util.Iterator;
+import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -29,10 +28,9 @@ import org.apache.camel.impl.DefaultConsumer;
  */
 public class TwitterConsumerDirect extends DefaultConsumer {
 
-    private Twitter4JConsumer twitter4jConsumer;
+    private final TwitterConsumer twitter4jConsumer;
 
-    public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor,
-                                 Twitter4JConsumer twitter4jConsumer) {
+    public TwitterConsumerDirect(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
         super(endpoint, processor);
 
         this.twitter4jConsumer = twitter4jConsumer;
@@ -42,11 +40,9 @@ public class TwitterConsumerDirect extends DefaultConsumer {
     protected void doStart() throws Exception {
         super.doStart();
 
-        Iterator<? extends Serializable> i = twitter4jConsumer.directConsume().iterator();
-        while (i.hasNext()) {
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(i.next());
-            getProcessor().process(e);
+        List<Exchange> exchanges = twitter4jConsumer.directConsume();
+        for (int i = 0; i < exchanges.size(); i++) {
+            getProcessor().process(exchanges.get(i));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
index 4feaf8a..f5b94a0 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
@@ -19,15 +19,13 @@ package org.apache.camel.component.twitter.consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
 import org.apache.camel.impl.DefaultConsumer;
-import twitter4j.Status;
 
-public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStatusListener {
-    private Twitter4JConsumer twitter4jConsumer;
+public class TwitterConsumerEvent extends DefaultConsumer implements TwitterEventListener {
+    private final TwitterConsumer twitter4jConsumer;
 
-    public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor,
-                                Twitter4JConsumer twitter4jConsumer) {
+    public TwitterConsumerEvent(TwitterEndpoint endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
         super(endpoint, processor);
         this.twitter4jConsumer = twitter4jConsumer;
     }
@@ -35,25 +33,28 @@ public class TwitterConsumerEvent extends DefaultConsumer implements TweeterStat
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (twitter4jConsumer instanceof StreamingConsumer) {
-            ((StreamingConsumer) twitter4jConsumer).registerTweetListener(this);
-            ((StreamingConsumer) twitter4jConsumer).doStart();
+
+        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+            ((AbstractStreamingConsumer) twitter4jConsumer).setEventListener(this);
+            ((AbstractStreamingConsumer) twitter4jConsumer).start();
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (twitter4jConsumer instanceof StreamingConsumer) {
-            ((StreamingConsumer) twitter4jConsumer).unregisterTweetListener(this);
-            ((StreamingConsumer) twitter4jConsumer).doStop();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+            ((AbstractStreamingConsumer) twitter4jConsumer).removeEventListener(this);
+            ((AbstractStreamingConsumer) twitter4jConsumer).stop();
         }
+
+        super.doStop();
     }
 
     @Override
-    public void onStatus(Status status) {
-        Exchange exchange = getEndpoint().createExchange();
-        exchange.getIn().setBody(status);
+    public void onEvent(Exchange exchange) {
+        if (!isRunAllowed()) {
+            return;
+        }
 
         try {
             getProcessor().process(exchange);

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
index 6236116..364e55a 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerPolling.java
@@ -16,15 +16,12 @@
  */
 package org.apache.camel.component.twitter.consumer;
 
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
+import java.util.List;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.component.twitter.TwitterEndpoint;
 import org.apache.camel.component.twitter.TwitterEndpointPolling;
-import org.apache.camel.component.twitter.consumer.streaming.StreamingConsumer;
+import org.apache.camel.component.twitter.consumer.streaming.AbstractStreamingConsumer;
 import org.apache.camel.impl.ScheduledPollConsumer;
 
 /**
@@ -33,10 +30,9 @@ import org.apache.camel.impl.ScheduledPollConsumer;
 public class TwitterConsumerPolling extends ScheduledPollConsumer {
 
     public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
-    private Twitter4JConsumer twitter4jConsumer;
+    private final TwitterConsumer twitter4jConsumer;
 
-    public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor,
-                                  Twitter4JConsumer twitter4jConsumer) {
+    public TwitterConsumerPolling(TwitterEndpointPolling endpoint, Processor processor, TwitterConsumer twitter4jConsumer) {
         super(endpoint, processor);
         setDelay(DEFAULT_CONSUMER_DELAY);
         this.twitter4jConsumer = twitter4jConsumer;
@@ -45,31 +41,29 @@ public class TwitterConsumerPolling extends ScheduledPollConsumer {
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        if (twitter4jConsumer instanceof StreamingConsumer) {
-            ((StreamingConsumer) twitter4jConsumer).doStart();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+            ((AbstractStreamingConsumer) twitter4jConsumer).start();
         }
     }
 
     @Override
     protected void doStop() throws Exception {
-        super.doStop();
-        if (twitter4jConsumer instanceof StreamingConsumer) {
-            ((StreamingConsumer) twitter4jConsumer).doStop();
+        if (twitter4jConsumer instanceof AbstractStreamingConsumer) {
+            ((AbstractStreamingConsumer) twitter4jConsumer).stop();
         }
+
+        super.doStop();
     }
 
+    @Override
     protected int poll() throws Exception {
-        Iterator<? extends Serializable> i = twitter4jConsumer.pollConsume().iterator();
-
-        int total = 0;
-        while (i.hasNext()) {
-            Exchange e = getEndpoint().createExchange();
-            e.getIn().setBody(i.next());
-            getProcessor().process(e);
+        List<Exchange> exchanges = twitter4jConsumer.pollConsume();
 
-            total++;
+        int index = 0;
+        for (; index < exchanges.size(); index++) {
+            getProcessor().process(exchanges.get(index));
         }
 
-        return total;
+        return index;
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
new file mode 100644
index 0000000..022dbff
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventListener.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.EventListener;
+
+import org.apache.camel.Exchange;
+
+public interface TwitterEventListener extends EventListener {
+
+    void onEvent(Exchange exchange);
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
new file mode 100644
index 0000000..77600d2
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterEventType.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterConstants;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+
+public enum TwitterEventType {
+    STATUS,
+    DIRECT_MESSAGE,
+    FAVORITE,
+    UNFAVORITE,
+    FOLLOW,
+    UNFOLLOW,
+    USERLIST_MEMBER_ADDITION,
+    USERLIST_MEMBER_DELETION,
+    USERLIST_SUBSCRIPTION,
+    USERLIST_UNSUBSCRIPTION,
+    USERLIST_CREATION,
+    USERLIST_UPDATE,
+    USERLIST_DELETETION,
+    USER_PROFILE_UPDATE,
+    USER_SUSPENSION,
+    USER_DELETION,
+    BLOCK,
+    UNBLOCK,
+    RETWEETED_RETWEET,
+    FAVORITED_RETWEET,
+    QUOTED_TWEET;
+
+    public Exchange createExchange(TwitterEndpoint endpoint) {
+        return createExchange(endpoint, null);
+    }
+
+    public <T> Exchange createExchange(TwitterEndpoint endpoint, T body) {
+        Exchange exchange = endpoint.createExchange();
+        exchange.getIn().setHeader(TwitterConstants.TWITTER_EVENT_TYPE, name());
+
+        if (body != null) {
+            exchange.getIn().setBody(body);
+        }
+
+        return exchange;
+    }
+
+    public <T> List<Exchange> createExchangeList(TwitterEndpoint endpoint, List<T> bodyList) {
+        List<Exchange> exchanges = Collections.emptyList();
+
+        if (bodyList != null && !bodyList.isEmpty()) {
+            exchanges = new ArrayList<>(bodyList.size());
+            for (int i = 0; i < bodyList.size(); i++) {
+                exchanges.add(createExchange(endpoint, bodyList.get(i)));
+            }
+        }
+
+        return exchanges;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
index 500c6e6..d0b89de 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/directmessage/DirectMessageConsumer.java
@@ -18,31 +18,37 @@ package org.apache.camel.component.twitter.consumer.directmessage;
 
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
 import twitter4j.DirectMessage;
-import twitter4j.Paging;
 import twitter4j.TwitterException;
 
 /**
  * Consumes a user's direct messages
  */
-public class DirectMessageConsumer extends Twitter4JConsumer {
+public class DirectMessageConsumer extends TwitterConsumer {
 
     public DirectMessageConsumer(TwitterEndpoint te) {
         super(te);
     }
 
-    public List<DirectMessage> pollConsume() throws TwitterException {
-        List<DirectMessage> list = te.getProperties().getTwitter().getDirectMessages(new Paging(lastId));
-        for (DirectMessage dm : list) {
-            checkLastId(dm.getId());
+    @Override
+    public List<Exchange> pollConsume() throws TwitterException {
+        List<DirectMessage> directMessages =  getTwitter().getDirectMessages(getLastIdPaging());
+        for (int i = 0; i < directMessages.size(); i++) {
+            setLastIdIfGreater(directMessages.get(i).getId());
         }
-        return list;
+
+        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(endpoint, directMessages);
     }
 
-    public List<DirectMessage> directConsume() throws TwitterException {
-        return te.getProperties().getTwitter().getDirectMessages();
+    @Override
+    public List<Exchange> directConsume() throws TwitterException {
+        return TwitterEventType.DIRECT_MESSAGE.createExchangeList(
+            endpoint,
+            getTwitter().getDirectMessages()
+        );
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
index 4b2cdf1..1c3a087 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/search/SearchConsumer.java
@@ -19,8 +19,10 @@ package org.apache.camel.component.twitter.consumer.search;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +37,7 @@ import twitter4j.TwitterException;
 /**
  * Consumes search requests
  */
-public class SearchConsumer extends Twitter4JConsumer {
+public class SearchConsumer extends TwitterConsumer {
 
     private static final Logger LOG = LoggerFactory.getLogger(SearchConsumer.class);
 
@@ -43,8 +45,8 @@ public class SearchConsumer extends Twitter4JConsumer {
         super(te);
     }
 
-    public List<Status> pollConsume() throws TwitterException {
-        String keywords = te.getProperties().getKeywords();
+    public List<Exchange> pollConsume() throws TwitterException {
+        String keywords = endpoint.getProperties().getKeywords();
 
         Query query;
 
@@ -56,15 +58,15 @@ public class SearchConsumer extends Twitter4JConsumer {
             LOG.debug("Searching twitter without keywords.");
         }
 
-        if (te.getProperties().isFilterOld()) {
-            query.setSinceId(lastId);
+        if (endpoint.getProperties().isFilterOld()) {
+            query.setSinceId(getLastId());
         }
 
         return search(query);
     }
 
-    public List<Status> directConsume() throws TwitterException {
-        String keywords = te.getProperties().getKeywords();
+    public List<Exchange> directConsume() throws TwitterException {
+        String keywords = endpoint.getProperties().getKeywords();
         if (keywords == null || keywords.trim().length() == 0) {
             return Collections.emptyList();
         }
@@ -74,33 +76,33 @@ public class SearchConsumer extends Twitter4JConsumer {
         return search(query);
     }
 
-    private List<Status> search(Query query) throws TwitterException {
+    private List<Exchange> search(Query query) throws TwitterException {
         Integer numberOfPages = 1;
 
-        if (ObjectHelper.isNotEmpty(te.getProperties().getLang())) {
-            query.setLang(te.getProperties().getLang());
+        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLang())) {
+            query.setLang(endpoint.getProperties().getLang());
         }
 
-        if (ObjectHelper.isNotEmpty(te.getProperties().getCount())) {
-            query.setCount(te.getProperties().getCount());
+        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getCount())) {
+            query.setCount(endpoint.getProperties().getCount());
         }
 
-        if (ObjectHelper.isNotEmpty(te.getProperties().getNumberOfPages())) {
-            numberOfPages = te.getProperties().getNumberOfPages();
+        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getNumberOfPages())) {
+            numberOfPages = endpoint.getProperties().getNumberOfPages();
         }
 
-        if (ObjectHelper.isNotEmpty(te.getProperties().getLatitude())
-                && ObjectHelper.isNotEmpty(te.getProperties().getLongitude())
-                && ObjectHelper.isNotEmpty(te.getProperties().getRadius())) {
-            GeoLocation location = new GeoLocation(te.getProperties().getLatitude(), te.getProperties().getLongitude());
-            query.setGeoCode(location, te.getProperties().getRadius(), Unit.valueOf(te.getProperties().getDistanceMetric()));
+        if (ObjectHelper.isNotEmpty(endpoint.getProperties().getLatitude())
+                && ObjectHelper.isNotEmpty(endpoint.getProperties().getLongitude())
+                && ObjectHelper.isNotEmpty(endpoint.getProperties().getRadius())) {
+            GeoLocation location = new GeoLocation(endpoint.getProperties().getLatitude(), endpoint.getProperties().getLongitude());
+            query.setGeoCode(location, endpoint.getProperties().getRadius(), Unit.valueOf(endpoint.getProperties().getDistanceMetric()));
 
             LOG.debug("Searching with additional geolocation parameters.");
         }
 
         LOG.debug("Searching with {} pages.", numberOfPages);
 
-        Twitter twitter = te.getProperties().getTwitter();
+        Twitter twitter = getTwitter();
         QueryResult qr = twitter.search(query);
         List<Status> tweets = qr.getTweets();
 
@@ -113,13 +115,13 @@ public class SearchConsumer extends Twitter4JConsumer {
             tweets.addAll(qr.getTweets());
         }
 
-        if (te.getProperties().isFilterOld()) {
-            for (Status t : tweets) {
-                checkLastId(t.getId());
+        if (endpoint.getProperties().isFilterOld()) {
+            for (int i = 0; i < tweets.size(); i++) {
+                setLastIdIfGreater(tweets.get(i).getId());
             }
         }
 
-        return tweets;
+        return TwitterEventType.STATUS.createExchangeList(endpoint, tweets);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
new file mode 100644
index 0000000..7a7bfcb
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/AbstractStreamingConsumer.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Service;
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import org.apache.camel.component.twitter.consumer.TwitterConsumer;
+import org.apache.camel.component.twitter.consumer.TwitterEventListener;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
+import twitter4j.TwitterException;
+import twitter4j.TwitterStream;
+
+/**
+ * Super class providing consuming capabilities for the streaming API.
+ */
+public abstract class AbstractStreamingConsumer extends TwitterConsumer implements StatusListener, Service {
+    private final TwitterStream twitterStream;
+    private final List<Exchange> receivedStatuses;
+    private final AtomicReference<TwitterEventListener> twitterEventListener;
+    private boolean clear;
+
+    public AbstractStreamingConsumer(TwitterEndpoint te) {
+        super(te);
+        this.receivedStatuses = new ArrayList<>();
+        this.twitterStream = te.getProperties().createTwitterStream();
+        this.twitterStream.addListener(this);
+        this.twitterEventListener = new AtomicReference<>();
+        this.clear = true;
+    }
+
+    @Override
+    public List<Exchange> pollConsume() throws TwitterException {
+        List<Exchange> result;
+
+        synchronized (receivedStatuses) {
+            clear = true;
+            result = Collections.unmodifiableList(new ArrayList<>(receivedStatuses));
+        }
+
+        return result;
+    }
+
+    @Override
+    public List<Exchange> directConsume() throws TwitterException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public void onException(Exception ex) {
+    }
+
+    @Override
+    public void onStatus(Status status) {
+        onEvent(TwitterEventType.STATUS.createExchange(endpoint, status));
+    }
+
+    @Override
+    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+        // noop
+    }
+
+    @Override
+    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
+        // noop
+    }
+
+    @Override
+    public void onScrubGeo(long userId, long upToStatusId) {
+        // noop
+    }
+
+    public void setEventListener(TwitterEventListener tweeterStatusListener) {
+        twitterEventListener.set(tweeterStatusListener);
+    }
+
+    public void removeEventListener(TwitterEventListener tweeterStatusListener) {
+        twitterEventListener.compareAndSet(tweeterStatusListener, null);
+    }
+
+    @Override
+    public void stop() {
+        twitterStream.removeListener(this);
+        twitterStream.shutdown();
+        twitterStream.cleanUp();
+    }
+
+    protected TwitterStream getTwitterStream() {
+        return twitterStream;
+    }
+
+    protected void onEvent(Exchange exchange) {
+        TwitterEventListener listener = twitterEventListener.get();
+        if (listener != null) {
+            listener.onEvent(exchange);
+        } else {
+            synchronized (receivedStatuses) {
+                if (clear) {
+                    receivedStatuses.clear();
+                    clear = false;
+                }
+                receivedStatuses.add(exchange);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
deleted file mode 100644
index 588a5e1..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.FilterQuery;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the filter stream
- */
-public class FilterConsumer extends StreamingConsumer {
-
-    public FilterConsumer(TwitterEndpoint te) {
-        super(te);
-    }
-
-    @Override
-    protected void startStreaming() {
-        twitterStream.filter(createFilter(te));
-    }
-
-    @Override
-    public void onStallWarning(StallWarning stallWarning) {
-        // noop
-    }
-
-    private FilterQuery createFilter(TwitterEndpoint te) {
-        FilterQuery filterQuery = new FilterQuery();
-        String allLocationsString = te.getProperties().getLocations();
-        if (allLocationsString != null) {
-            String[] locationStrings = allLocationsString.split(";");
-            double[][] locations = new double[locationStrings.length][2];
-            for (int i = 0; i < locationStrings.length; i++) {
-                String[] coords = locationStrings[i].split(",");
-                locations[i][0] = Double.valueOf(coords[0]);
-                locations[i][1] = Double.valueOf(coords[1]);
-            }
-            filterQuery.locations(locations);
-        }
-
-        String keywords = te.getProperties().getKeywords();
-        if (keywords != null && keywords.length() > 0) {
-            filterQuery.track(keywords.split(","));
-        }
-
-        String userIds = te.getProperties().getUserIds();
-        if (userIds != null) {
-            String[] stringUserIds = userIds.split(",");
-            long[] longUserIds = new long[stringUserIds.length];
-            for (int i = 0; i < stringUserIds.length; i++) {
-                longUserIds[i] = Long.valueOf(stringUserIds[i]);
-            }
-            filterQuery.follow(longUserIds);
-        }
-
-        if (allLocationsString == null && keywords == null && userIds == null) {
-            throw new IllegalArgumentException("At least one filter parameter is required");
-        }
-
-        return filterQuery;
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
new file mode 100644
index 0000000..62460cc
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterStreamingConsumer.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+
+/**
+ * Consumes the filter stream
+ */
+public class FilterStreamingConsumer extends AbstractStreamingConsumer {
+
+    public FilterStreamingConsumer(TwitterEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void start() {
+        getTwitterStream().filter(createFilter());
+    }
+
+    @Override
+    public void onStallWarning(StallWarning stallWarning) {
+        // noop
+    }
+
+    private FilterQuery createFilter() {
+        FilterQuery filterQuery = new FilterQuery();
+        String allLocationsString = endpoint.getProperties().getLocations();
+        if (allLocationsString != null) {
+            String[] locationStrings = allLocationsString.split(";");
+            double[][] locations = new double[locationStrings.length][2];
+            for (int i = 0; i < locationStrings.length; i++) {
+                String[] coords = locationStrings[i].split(",");
+                locations[i][0] = Double.valueOf(coords[0]);
+                locations[i][1] = Double.valueOf(coords[1]);
+            }
+            filterQuery.locations(locations);
+        }
+
+        String keywords = endpoint.getProperties().getKeywords();
+        if (keywords != null && keywords.length() > 0) {
+            filterQuery.track(keywords.split(","));
+        }
+
+        String userIds = endpoint.getProperties().getUserIds();
+        if (userIds != null) {
+            String[] stringUserIds = userIds.split(",");
+            long[] longUserIds = new long[stringUserIds.length];
+            for (int i = 0; i < stringUserIds.length; i++) {
+                longUserIds[i] = Long.valueOf(stringUserIds[i]);
+            }
+            filterQuery.follow(longUserIds);
+        }
+
+        if (allLocationsString == null && keywords == null && userIds == null) {
+            throw new IllegalArgumentException("At least one filter parameter is required");
+        }
+
+        return filterQuery;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
deleted file mode 100644
index 89cc157..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import twitter4j.StallWarning;
-
-/**
- * Consumes the sample stream
- */
-public class SampleConsumer extends StreamingConsumer {
-
-    public SampleConsumer(TwitterEndpoint te) {
-        super(te);
-    }
-
-    @Override
-    protected void startStreaming() {
-        twitterStream.sample();
-    }
-
-    @Override
-    public void onStallWarning(StallWarning stallWarning) {
-        // noop
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
new file mode 100644
index 0000000..e0a5b27
--- /dev/null
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleStreamingConsumer.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.twitter.consumer.streaming;
+
+import org.apache.camel.component.twitter.TwitterEndpoint;
+import twitter4j.StallWarning;
+
+/**
+ * Consumes the sample stream
+ */
+public class SampleStreamingConsumer extends AbstractStreamingConsumer {
+
+    public SampleStreamingConsumer(TwitterEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void start() {
+        getTwitterStream().sample();
+    }
+
+    @Override
+    public void onStallWarning(StallWarning stallWarning) {
+        // noop
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
deleted file mode 100644
index abb59cd..0000000
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.twitter.consumer.streaming;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.camel.component.twitter.TwitterEndpoint;
-import org.apache.camel.component.twitter.consumer.TweeterStatusListener;
-import org.apache.camel.component.twitter.consumer.Twitter4JConsumer;
-
-import twitter4j.Status;
-import twitter4j.StatusDeletionNotice;
-import twitter4j.StatusListener;
-import twitter4j.TwitterException;
-import twitter4j.TwitterStream;
-
-/**
- * Super class providing consuming capabilities for the streaming API.
- */
-public abstract class StreamingConsumer extends Twitter4JConsumer implements StatusListener {
-    protected final TwitterStream twitterStream;
-    private final List<Status> receivedStatuses = new ArrayList<Status>();
-    private volatile boolean clear;
-    private TweeterStatusListener tweeterStatusListener;
-
-    public StreamingConsumer(TwitterEndpoint te) {
-        super(te);
-        twitterStream = te.getProperties().createTwitterStream();
-        twitterStream.addListener(this);
-    }
-
-    public List<Status> pollConsume() throws TwitterException {
-        clear = true;
-        return Collections.unmodifiableList(new ArrayList<Status>(receivedStatuses));
-    }
-
-    public List<Status> directConsume() throws TwitterException {
-        // not used
-        return null;
-    }
-
-    @Override
-    public void onException(Exception ex) {
-    }
-
-    @Override
-    public void onStatus(Status status) {
-        if (tweeterStatusListener != null) {
-            tweeterStatusListener.onStatus(status);
-        } else {
-            if (clear) {
-                receivedStatuses.clear();
-                clear = false;
-            }
-            receivedStatuses.add(status);
-        }
-    }
-
-    @Override
-    public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
-        // noop
-    }
-
-    @Override
-    public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
-        // noop
-    }
-
-    @Override
-    public void onScrubGeo(long userId, long upToStatusId) {
-        // noop
-    }
-
-    public void registerTweetListener(TweeterStatusListener tweeterStatusListener) {
-        this.tweeterStatusListener = tweeterStatusListener;
-    }
-
-    public void unregisterTweetListener(TweeterStatusListener tweeterStatusListener) {
-        this.tweeterStatusListener = null;
-    }
-
-    public void doStart() {
-        startStreaming();
-    }
-
-    public void doStop() {
-        twitterStream.shutdown();
-    }
-
-    protected abstract void startStreaming();
-
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/33b3bd5c/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
index 751d9e9..e04ba31 100644
--- a/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
+++ b/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/UserStreamingConsumer.java
@@ -16,7 +16,11 @@
  */
 package org.apache.camel.component.twitter.consumer.streaming;
 
+import org.apache.camel.Exchange;
+import org.apache.camel.component.twitter.TwitterConstants;
 import org.apache.camel.component.twitter.TwitterEndpoint;
+import org.apache.camel.component.twitter.TwitterHelper;
+import org.apache.camel.component.twitter.consumer.TwitterEventType;
 import twitter4j.DirectMessage;
 import twitter4j.StallWarning;
 import twitter4j.Status;
@@ -24,110 +28,168 @@ import twitter4j.User;
 import twitter4j.UserList;
 import twitter4j.UserStreamListener;
 
-public class UserStreamingConsumer extends StreamingConsumer implements UserStreamListener {
+public class UserStreamingConsumer extends AbstractStreamingConsumer implements UserStreamListener {
 
-    public UserStreamingConsumer(TwitterEndpoint te) {
-        super(te);
+    public UserStreamingConsumer(TwitterEndpoint endpoint) {
+        super(endpoint);
     }
 
     @Override
-    protected void startStreaming() {
-        twitterStream.user();
+    public void start() {
+        getTwitterStream().user();
     }
 
     @Override
-    public void onDeletionNotice(long l, long l2) {
+    public void onDeletionNotice(long directMessageId, long userId) {
         // noop
     }
 
     @Override
-    public void onFriendList(long[] longs) {
+    public void onFriendList(long[] friendIds) {
         // noop
     }
 
     @Override
-    public void onFavorite(User user, User user2, Status status) {
-        // noop
+    public void onFavorite(User source, User target, Status favoritedStatus) {
+        Exchange exchange = TwitterEventType.FAVORITE.createExchange(endpoint, favoritedStatus);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUnfavorite(User user, User user2, Status status) {
-        // noop
+    public void onUnfavorite(User source, User target, Status unfavoritedStatus) {
+        Exchange exchange = TwitterEventType.UNFAVORITE.createExchange(endpoint, unfavoritedStatus);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onFollow(User user, User user2) {
-        // noop
+    public void onFollow(User source, User followedUser) {
+        Exchange exchange = TwitterEventType.FOLLOW.createExchange(endpoint);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, followedUser, "followed");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUnfollow(User user, User user2) {
-        // noop
+    public void onUnfollow(User source, User unfollowedUser) {
+        Exchange exchange = TwitterEventType.UNFOLLOW.createExchange(endpoint);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, unfollowedUser, "unfollowed");
+
+        onEvent(exchange);
     }
 
     @Override
     public void onDirectMessage(DirectMessage directMessage) {
-        // noop
+        onEvent(TwitterEventType.DIRECT_MESSAGE.createExchange(endpoint, directMessage));
     }
 
     @Override
-    public void onUserListMemberAddition(User user, User user2, UserList userList) {
-        // noop
+    public void onUserListMemberAddition(User addedMember, User listOwner, UserList list) {
+        Exchange exchange = TwitterEventType.USERLIST_MEMBER_ADDITION.createExchange(endpoint, list);
+        TwitterHelper.setUserHeader(exchange, 1, addedMember, "addedMember");
+        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUserListMemberDeletion(User user, User user2, UserList userList) {
-        // noop
+    public void onUserListMemberDeletion(User deletedMember, User listOwner, UserList list) {
+        Exchange exchange = TwitterEventType.USERLIST_MEMBER_DELETION.createExchange(endpoint, list);
+        TwitterHelper.setUserHeader(exchange, 1, deletedMember, "deletedMember");
+        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUserListSubscription(User user, User user2, UserList userList) {
-        // noop
+    public void onUserListSubscription(User subscriber, User listOwner, UserList list) {
+        Exchange exchange = TwitterEventType.USERLIST_SUBSCRIPTION.createExchange(endpoint, list);
+        TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
+        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUserListUnsubscription(User user, User user2, UserList userList) {
-        // noop
+    public void onUserListUnsubscription(User subscriber, User listOwner, UserList list) {
+        Exchange exchange = TwitterEventType.USERLIST_UNSUBSCRIPTION.createExchange(endpoint, list);
+        TwitterHelper.setUserHeader(exchange, 1, subscriber, "subscriber");
+        TwitterHelper.setUserHeader(exchange, 2, listOwner, "listOwner");
+
+        onEvent(exchange);
     }
 
     @Override
     public void onUserListCreation(User user, UserList userList) {
-        // noop
+        Exchange exchange = TwitterEventType.USERLIST_CREATION.createExchange(endpoint, userList);
+        TwitterHelper.setUserHeader(exchange, user);
+
+        onEvent(exchange);
     }
 
     @Override
     public void onUserListUpdate(User user, UserList userList) {
-        // noop
+        Exchange exchange = TwitterEventType.USERLIST_UPDATE.createExchange(endpoint, userList);
+        TwitterHelper.setUserHeader(exchange, user);
+
+        onEvent(exchange);
     }
 
     @Override
     public void onUserListDeletion(User user, UserList userList) {
-        // noop
+        Exchange exchange = TwitterEventType.USERLIST_DELETETION.createExchange(endpoint, userList);
+        TwitterHelper.setUserHeader(exchange, user);
+
+        onEvent(exchange);
     }
 
     @Override
     public void onUserProfileUpdate(User user) {
-        // noop
+        Exchange exchange = TwitterEventType.USER_PROFILE_UPDATE.createExchange(endpoint);
+        TwitterHelper.setUserHeader(exchange, user);
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUserSuspension(long l) {
-        // noop
+    public void onUserSuspension(long suspendedUser) {
+        Exchange exchange = TwitterEventType.USER_SUSPENSION.createExchange(endpoint);
+        exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, suspendedUser);
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUserDeletion(long l) {
-        // noop
+    public void onUserDeletion(long deletedUser) {
+        Exchange exchange = TwitterEventType.USER_DELETION.createExchange(endpoint);
+        exchange.getIn().setHeader(TwitterConstants.TWITTER_USER, deletedUser);
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onBlock(User user, User user2) {
-        // noop
+    public void onBlock(User source, User blockedUser) {
+        Exchange exchange = TwitterEventType.BLOCK.createExchange(endpoint);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, blockedUser, "blocked");
+
+        onEvent(exchange);
     }
 
     @Override
-    public void onUnblock(User user, User user2) {
-        // noop
+    public void onUnblock(User source, User unblockedUser) {
+        Exchange exchange = TwitterEventType.UNBLOCK.createExchange(endpoint);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, unblockedUser, "unblocked");
+
+        onEvent(exchange);
     }
 
     @Override
@@ -137,20 +199,28 @@ public class UserStreamingConsumer extends StreamingConsumer implements UserStre
 
     @Override
     public void onRetweetedRetweet(User source, User target, Status retweetedStatus) {
-        // TODO Auto-generated method stub
-        
+        Exchange exchange = TwitterEventType.RETWEETED_RETWEET.createExchange(endpoint, retweetedStatus);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+        onEvent(exchange);
     }
 
     @Override
     public void onFavoritedRetweet(User source, User target, Status favoritedRetweeet) {
-        // TODO Auto-generated method stub
-        
+        Exchange exchange = TwitterEventType.FAVORITED_RETWEET.createExchange(endpoint, favoritedRetweeet);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, target, "target");
+
+        onEvent(exchange);
     }
 
     @Override
     public void onQuotedTweet(User source, User target, Status quotingTweet) {
-        // TODO Auto-generated method stub
-        
-    }
+        Exchange exchange = TwitterEventType.QUOTED_TWEET.createExchange(endpoint, quotingTweet);
+        TwitterHelper.setUserHeader(exchange, 1, source, "source");
+        TwitterHelper.setUserHeader(exchange, 2, target, "target");
 
+        onEvent(exchange);
+    }
 }