You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ni...@apache.org on 2012/08/22 10:45:03 UTC

svn commit: r1375946 - in /camel/branches/camel-2.10.x: ./ components/camel-twitter/src/main/java/org/apache/camel/component/twitter/ components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/ components/camel-twitter/src/main/...

Author: ningjiang
Date: Wed Aug 22 08:45:02 2012
New Revision: 1375946

URL: http://svn.apache.org/viewvc?rev=1375946&view=rev
Log:
Merged revisions 1375900 via svnmerge from 
https://svn.apache.org/repos/asf/camel/trunk

........
  r1375900 | ningjiang | 2012-08-22 13:16:16 +0800 (Wed, 22 Aug 2012) | 1 line
  
  CAMEL-5529 Shutdown Twitter instance if endpoint is shutdown
........

Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
    camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1375900

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterConfiguration.java Wed Aug 22 08:45:02 2012
@@ -213,9 +213,6 @@ public class TwitterConfiguration {
     }
 
     public TwitterStream getTwitterStream() {
-        if (twitterStream == null) {
-            twitterStream = new TwitterStreamFactory(getConfiguration()).getInstance();
-        }
         return twitterStream;
     }
 
@@ -239,6 +236,14 @@ public class TwitterConfiguration {
             throw new IllegalArgumentException("date must be in yyyy-mm-dd format!");
         }
     }
+
+    public TwitterStream createTwitterStream() {
+        if (twitterStream == null) {
+            twitterStream = new TwitterStreamFactory(getConfiguration()).getInstance();
+        }
+
+        return twitterStream;
+    }
 }
 
 

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointDirect.java Wed Aug 22 08:45:02 2012
@@ -61,4 +61,11 @@ public class TwitterEndpointDirect exten
     public ExchangePattern getExchangePattern() {
         return ExchangePattern.InOptionalOut;
     }
+
+    @Override
+    public void shutdown() throws Exception {
+        super.shutdown();
+        properties.getTwitter().shutdown();
+    }
+    
 }

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointEvent.java Wed Aug 22 08:45:02 2012
@@ -52,4 +52,19 @@ public class TwitterEndpointEvent extend
     public EndpointType getEndpointType() {
         return EndpointType.EVENT;
     }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        if (properties.getTwitterStream() != null) {
+            properties.getTwitterStream().shutdown();
+        }
+    }
+
+    @Override
+    public void shutdown() throws Exception {
+        super.shutdown();
+        properties.getTwitter().shutdown();
+    }
+
 }

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/TwitterEndpointPolling.java Wed Aug 22 08:45:02 2012
@@ -61,4 +61,11 @@ public class TwitterEndpointPolling exte
     public EndpointType getEndpointType() {
         return EndpointType.POLLING;
     }
+
+    @Override
+    public void shutdown() throws Exception {
+        super.shutdown();
+        properties.getTwitter().shutdown();
+    }
+    
 }

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/TwitterConsumerEvent.java Wed Aug 22 08:45:02 2012
@@ -39,6 +39,7 @@ public class TwitterConsumerEvent extend
         super.doStart();
         if (twitter4jConsumer instanceof StreamingConsumer) {
             ((StreamingConsumer) twitter4jConsumer).registerTweetListener(this);
+            ((StreamingConsumer) twitter4jConsumer).doStart();
         }
     }
 
@@ -47,6 +48,7 @@ public class TwitterConsumerEvent extend
         super.doStop();
         if (twitter4jConsumer instanceof StreamingConsumer) {
             ((StreamingConsumer) twitter4jConsumer).unregisterTweetListener(this);
+            ((StreamingConsumer) twitter4jConsumer).doStop();
         }
     }
 

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/FilterConsumer.java Wed Aug 22 08:45:02 2012
@@ -19,6 +19,7 @@ package org.apache.camel.component.twitt
 import org.apache.camel.component.twitter.TwitterEndpoint;
 
 import twitter4j.FilterQuery;
+import twitter4j.TwitterException;
 
 /**
  * Consumes the filter stream
@@ -27,6 +28,9 @@ public class FilterConsumer extends Stre
 
     public FilterConsumer(TwitterEndpoint te) {
         super(te);
+    }
+
+    protected void startStreaming() {
         twitterStream.filter(createFilter(te));
     }
 

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/SampleConsumer.java Wed Aug 22 08:45:02 2012
@@ -25,6 +25,9 @@ public class SampleConsumer extends Stre
 
     public SampleConsumer(TwitterEndpoint te) {
         super(te);
+    }
+
+    protected void startStreaming() {
         twitterStream.sample();
     }
 }

Modified: camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java?rev=1375946&r1=1375945&r2=1375946&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java (original)
+++ camel/branches/camel-2.10.x/components/camel-twitter/src/main/java/org/apache/camel/component/twitter/consumer/streaming/StreamingConsumer.java Wed Aug 22 08:45:02 2012
@@ -33,7 +33,7 @@ import twitter4j.TwitterStream;
 /**
  * Super class providing consuming capabilities for the streaming API.
  */
-public class StreamingConsumer extends Twitter4JConsumer implements StatusListener {
+public abstract class StreamingConsumer extends Twitter4JConsumer implements StatusListener {
     protected final TwitterStream twitterStream;
     private final List<Status> receivedStatuses = new ArrayList<Status>();
     private volatile boolean clear;
@@ -41,7 +41,7 @@ public class StreamingConsumer extends T
 
     public StreamingConsumer(TwitterEndpoint te) {
         super(te);
-        twitterStream = te.getProperties().getTwitterStream();
+        twitterStream = te.getProperties().createTwitterStream();
         twitterStream.addListener(this);
     }
 
@@ -95,4 +95,14 @@ public class StreamingConsumer extends T
         this.tweeterStatusListener = null;
     }
 
+    public void doStart() {
+        startStreaming();
+    }
+
+    public void doStop() {
+        twitterStream.shutdown();
+    }
+
+    protected abstract void startStreaming();
+
 }