You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@asterixdb.apache.org by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org> on 2016/10/12 21:56:03 UTC

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Xikui Wang has uploaded a new change for review.

  https://asterix-gerrit.ics.uci.edu/1272

Change subject: Add user-stream for Twitter Adaptor
......................................................................

Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 278 insertions(+), 93 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/72/1272/1

diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..c296bc6 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
@@ -33,27 +35,43 @@
 import twitter4j.StatusListener;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 
 public class TwitterPushRecordReader implements IRecordReader<String> {
     private LinkedBlockingQueue<String> inputQ;
     private TwitterStream twitterStream;
     private GenericRecord<String> record;
+    private StatusListener tweetListener;
     private boolean closed = false;
 
-    public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
-        record = new GenericRecord<>();
-        inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
-        this.twitterStream.addListener(new TweetListener(inputQ));
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener,
+            FilterQuery query) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
         this.twitterStream.filter(query);
     }
 
-    public TwitterPushRecordReader(TwitterStream twitterStream) {
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.sample();
+    }
+
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.user();
+    }
+
+    private void init(TwitterStream twitterStream) {
         record = new GenericRecord<>();
         inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//
-        this.twitterStream.addListener(new TweetListener(inputQ));
-        twitterStream.sample();
+        this.twitterStream = twitterStream;
     }
 
     @Override
@@ -89,46 +107,6 @@
             return false;
         }
         return true;
-    }
-
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<String> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<String> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
-            inputQ.add(jsonTweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-            // do nothing
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-            // do nothing
-        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..c2af93f 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -73,39 +73,36 @@
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
             throw new AsterixException(builder.toString());
         }
-        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
-            pull = true;
-            if (configuration.get(SearchAPIConstants.QUERY) == null) {
-                throw new AsterixException(
-                        "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
-            }
-            String interval = configuration.get(SearchAPIConstants.INTERVAL);
-            if (interval != null) {
-                try {
-                    Integer.parseInt(interval);
-                } catch (NumberFormatException nfe) {
-                    throw new IllegalArgumentException(
-                            "parameter " + SearchAPIConstants.INTERVAL + " is defined incorrectly, expecting a number");
-                }
-            } else {
-                configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
-                            + DEFAULT_INTERVAL + ")");
-                }
-            }
-        } else {
-            pull = false;
-        }
-    }
 
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                if (configuration.get(SearchAPIConstants.QUERY) == null) {
+                    throw new AsterixException("parameter " + SearchAPIConstants.QUERY
+                            + " not specified as part of adaptor configuration");
+                }
+                String interval = configuration.get(SearchAPIConstants.INTERVAL);
+                if (interval != null) {
+                    try {
+                        Integer.parseInt(interval);
+                    } catch (NumberFormatException nfe) {
+                        throw new IllegalArgumentException("parameter " + SearchAPIConstants.INTERVAL
+                                + " is defined incorrectly, expecting a number");
+                    }
+                } else {
+                    configuration.put(SearchAPIConstants.INTERVAL, DEFAULT_INTERVAL);
+                    if (LOGGER.isLoggable(Level.WARNING)) {
+                        LOGGER.warning(" Parameter " + SearchAPIConstants.INTERVAL + " not defined, using default ("
+                                + DEFAULT_INTERVAL + ")");
+                    }
+                }
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                // do nothing.
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                // do nothing.
+                break;
         }
-        return false;
     }
 
     @Override
@@ -116,20 +113,34 @@
     @Override
     public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        if (pull) {
-            return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
-                    configuration.get(SearchAPIConstants.QUERY),
-                    Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
-        } else {
-            FilterQuery query;
-            try {
-                query = TwitterUtil.getFilterQuery(configuration);
-                return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
-                        : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
-            } catch (AsterixException e) {
-                throw new HyracksDataException(e);
-            }
+        IRecordReader<? extends String> recordReader;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+                        configuration.get(SearchAPIConstants.QUERY),
+                        Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                FilterQuery query;
+                try {
+                    query = TwitterUtil.getFilterQuery(configuration);
+                    recordReader = (query == null)
+                            ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener())
+                            : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener(), query);
+                } catch (AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                        TwitterUtil.getUserTweetsListener());
+                break;
+            default:
+                throw new HyracksDataException("No Record reader found!");
         }
+        return recordReader;
     }
 
     @Override
@@ -147,4 +158,5 @@
         }
         return true;
     }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index ad11171..d3faf84 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -101,6 +101,7 @@
             case ExternalDataConstants.READER_TWITTER_PUSH:
             case ExternalDataConstants.READER_PUSH_TWITTER:
             case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
                 return new TwitterRecordReaderFactory();
             case ExternalDataConstants.ALIAS_TWITTER_FIREHOSE_ADAPTER:
                 return new StreamRecordReaderFactory(new TwitterFirehoseStreamFactory());
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index c5167c1..fa337fa 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -128,6 +128,7 @@
     public static final String READER_PUSH_TWITTER = "push_twitter";
     public static final String READER_TWITTER_PULL = "twitter_pull";
     public static final String READER_PULL_TWITTER = "pull_twitter";
+    public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index 70d31c0..dedba40 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -19,17 +19,27 @@
 package org.apache.asterix.external.util;
 
 import org.apache.asterix.common.exceptions.AsterixException;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.InputStream;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -275,4 +285,187 @@
         public static final String INTERVAL = "interval";
     }
 
+    public static UserTweetsListener getUserTweetsListener() {
+        return new UserTweetsListener();
+    }
+
+    public static TweetListener getTweetListener() {
+        return new TweetListener();
+    }
+
+    public static class UserTweetsListener implements UserStreamListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onDeletionNotice(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onFriendList(long[] longs) {
+            //do nothing
+        }
+
+        @Override
+        public void onFavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onFollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onDirectMessage(DirectMessage directMessage) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberAddition(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberDeletion(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListSubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUnsubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListCreation(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUpdate(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListDeletion(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserProfileUpdate(User user) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserSuspension(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserDeletion(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onBlock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnblock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStatus(Status status) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+            //do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int i) {
+            //do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning stallWarning) {
+            //do nothing
+        }
+
+        @Override
+        public void onException(Exception e) {
+            //do nothing
+        }
+    }
+
+    public static class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+            // do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+            // do nothing
+        }
+    }
+
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 3: Code-Review+2

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

Also, the commit message doesn't show how to pass the user we want to listen to.

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 1:

Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/896/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

(1 comment)

https://asterix-gerrit.ics.uci.edu/#/c/1272/2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
File asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java:

Line 68:     }
Can we extract the common code from these three constructors and move it to init?


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

(1 comment)

The user whose access token is used in the feed creation, is the one that we are monitoring.

https://asterix-gerrit.ics.uci.edu/#/c/1272/2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
File asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java:

Line 68:     }
> Can we extract the common code from these three constructors and move it to
Yeah. Basically the only part that could be shrunk is the         tweetListener.setInputQ(inputQ). But the tweetListener(s) are not the same in them, should I cast all to be tweetListener?


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 3: Integration-Tests+1

Integration Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/989/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 3:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/3098/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

(3 comments)

Oh, so this is something that is already sent in the "push feed?"

https://asterix-gerrit.ics.uci.edu/#/c/1272/2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
File asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java:

Line 68:     }
> Yeah. Basically the only part that could be shrunk is the         tweetList
Okay, let's just leave as it is then.


https://asterix-gerrit.ics.uci.edu/#/c/1272/2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
File asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java:

Line 119:                             ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
> BLOCKER SonarQube violation:
I'm guessing this doesn't get closed until the feed is dropped?


Line 121:                             : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
> BLOCKER SonarQube violation:
I'm guessing this doesn't get closed until the feed is dropped?


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 1: Integration-Tests+1

Integration Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/896/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/1272

to look at the new patch set (#2).

Change subject: Add user-stream for Twitter Adaptor
......................................................................

Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 251 insertions(+), 76 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/72/1272/2
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2: Integration-Tests+1

Integration Tests Successful

https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/904/ : SUCCESS

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 1:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/2985/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 3:

Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/989/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

+2

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

Integration Tests Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-integration-tests/904/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has submitted this change and it was merged.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Reviewed-on: https://asterix-gerrit.ics.uci.edu/1272
Reviewed-by: Steven Jacobs <sj...@ucr.edu>
Tested-by: Jenkins <je...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <je...@fulliautomatix.ics.uci.edu>
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 251 insertions(+), 76 deletions(-)

Approvals:
  Steven Jacobs: Looks good to me, approved
  Jenkins: Verified; Verified

Objections:
  Jenkins: Violations found



diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
index 9ead8a9..ffffbd7 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
@@ -26,6 +26,8 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.FeedLogManager;
+import org.apache.asterix.external.util.TwitterUtil;
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
 import twitter4j.StallWarning;
 import twitter4j.Status;
@@ -33,6 +35,9 @@
 import twitter4j.StatusListener;
 import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 
 public class TwitterPushRecordReader implements IRecordReader<String> {
     private LinkedBlockingQueue<String> inputQ;
@@ -40,20 +45,32 @@
     private GenericRecord<String> record;
     private boolean closed = false;
 
-    public TwitterPushRecordReader(TwitterStream twitterStream, FilterQuery query) {
-        record = new GenericRecord<>();
-        inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//TwitterUtil.getTwitterStream(configuration);
-        this.twitterStream.addListener(new TweetListener(inputQ));
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener,
+            FilterQuery query) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
         this.twitterStream.filter(query);
     }
 
-    public TwitterPushRecordReader(TwitterStream twitterStream) {
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.TweetListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.sample();
+    }
+
+    public TwitterPushRecordReader(TwitterStream twitterStream, TwitterUtil.UserTweetsListener tweetListener) {
+        init(twitterStream);
+        tweetListener.setInputQ(inputQ);
+        this.twitterStream.addListener(tweetListener);
+        twitterStream.user();
+    }
+
+    private void init(TwitterStream twitterStream) {
         record = new GenericRecord<>();
         inputQ = new LinkedBlockingQueue<>();
-        this.twitterStream = twitterStream;//
-        this.twitterStream.addListener(new TweetListener(inputQ));
-        twitterStream.sample();
+        this.twitterStream = twitterStream;
     }
 
     @Override
@@ -89,46 +106,6 @@
             return false;
         }
         return true;
-    }
-
-    private class TweetListener implements StatusListener {
-
-        private LinkedBlockingQueue<String> inputQ;
-
-        public TweetListener(LinkedBlockingQueue<String> inputQ) {
-            this.inputQ = inputQ;
-        }
-
-        @Override
-        public void onStatus(Status tweet) {
-            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
-            inputQ.add(jsonTweet);
-        }
-
-        @Override
-        public void onException(Exception arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onDeletionNotice(StatusDeletionNotice arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onScrubGeo(long arg0, long arg1) {
-            // do nothing
-        }
-
-        @Override
-        public void onStallWarning(StallWarning arg0) {
-            // do nothing
-        }
-
-        @Override
-        public void onTrackLimitationNotice(int arg0) {
-            // do nothing
-        }
     }
 
     @Override
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
index 73d1b39..570155c 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
@@ -46,7 +46,6 @@
     private static final int INTAKE_CARDINALITY = 1; // degree of parallelism at intake stage
 
     private Map<String, String> configuration;
-    private boolean pull;
     private transient AlgebricksAbsolutePartitionConstraint clusterLocations;
 
     @Override
@@ -73,8 +72,8 @@
             builder.append(AuthenticationConstants.OAUTH_ACCESS_TOKEN_SECRET);
             throw new AsterixException(builder.toString());
         }
-        if (TwitterRecordReaderFactory.isTwitterPull(configuration)) {
-            pull = true;
+
+        if (configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.READER_PULL_TWITTER)) {
             if (configuration.get(SearchAPIConstants.QUERY) == null) {
                 throw new AsterixException(
                         "parameter " + SearchAPIConstants.QUERY + " not specified as part of adaptor configuration");
@@ -94,18 +93,7 @@
                             + DEFAULT_INTERVAL + ")");
                 }
             }
-        } else {
-            pull = false;
         }
-    }
-
-    public static boolean isTwitterPull(Map<String, String> configuration) {
-        String reader = configuration.get(ExternalDataConstants.KEY_READER);
-        if (reader.equals(ExternalDataConstants.READER_TWITTER_PULL)
-                || reader.equals(ExternalDataConstants.READER_PULL_TWITTER)) {
-            return true;
-        }
-        return false;
     }
 
     @Override
@@ -116,20 +104,34 @@
     @Override
     public IRecordReader<? extends String> createRecordReader(IHyracksTaskContext ctx, int partition)
             throws HyracksDataException {
-        if (pull) {
-            return new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
-                    configuration.get(SearchAPIConstants.QUERY),
-                    Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
-        } else {
-            FilterQuery query;
-            try {
-                query = TwitterUtil.getFilterQuery(configuration);
-                return (query == null) ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration))
-                        : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration), query);
-            } catch (AsterixException e) {
-                throw new HyracksDataException(e);
-            }
+        IRecordReader<? extends String> recordReader;
+        switch (configuration.get(ExternalDataConstants.KEY_READER)) {
+            case ExternalDataConstants.READER_PULL_TWITTER:
+                recordReader = new TwitterPullRecordReader(TwitterUtil.getTwitterService(configuration),
+                        configuration.get(SearchAPIConstants.QUERY),
+                        Integer.parseInt(configuration.get(SearchAPIConstants.INTERVAL)));
+                break;
+            case ExternalDataConstants.READER_PUSH_TWITTER:
+                FilterQuery query;
+                try {
+                    query = TwitterUtil.getFilterQuery(configuration);
+                    recordReader = (query == null)
+                            ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener())
+                            : new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                                    TwitterUtil.getTweetListener(), query);
+                } catch (AsterixException e) {
+                    throw new HyracksDataException(e);
+                }
+                break;
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
+                recordReader = new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
+                        TwitterUtil.getUserTweetsListener());
+                break;
+            default:
+                throw new HyracksDataException("No Record reader found!");
         }
+        return recordReader;
     }
 
     @Override
@@ -147,4 +149,5 @@
         }
         return true;
     }
+
 }
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 7ab6430..73a5302 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -97,6 +97,7 @@
             case ExternalDataConstants.READER_TWITTER_PUSH:
             case ExternalDataConstants.READER_PUSH_TWITTER:
             case ExternalDataConstants.READER_PULL_TWITTER:
+            case ExternalDataConstants.READER_USER_STREAM_TWITTER:
                 return new TwitterRecordReaderFactory();
             case ExternalDataConstants.ALIAS_SOCKET_ADAPTER:
             case ExternalDataConstants.SOCKET:
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index a270e88..881c498 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -127,6 +127,7 @@
     public static final String READER_PUSH_TWITTER = "push_twitter";
     public static final String READER_TWITTER_PULL = "twitter_pull";
     public static final String READER_PULL_TWITTER = "pull_twitter";
+    public static final String READER_USER_STREAM_TWITTER = "twitter_user_stream";
 
     public static final String CLUSTER_LOCATIONS = "cluster-locations";
     public static final String SCHEDULER = "hdfs-scheduler";
diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
index d8f375b..bd8e52d 100644
--- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
+++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
@@ -22,6 +22,7 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
@@ -29,11 +30,20 @@
 
 import org.apache.asterix.common.exceptions.AsterixException;
 
+import twitter4j.DirectMessage;
 import twitter4j.FilterQuery;
+import twitter4j.StallWarning;
+import twitter4j.Status;
+import twitter4j.StatusDeletionNotice;
+import twitter4j.StatusListener;
 import twitter4j.Twitter;
 import twitter4j.TwitterFactory;
+import twitter4j.TwitterObjectFactory;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.User;
+import twitter4j.UserList;
+import twitter4j.UserStreamListener;
 import twitter4j.conf.ConfigurationBuilder;
 
 public class TwitterUtil {
@@ -291,4 +301,187 @@
         public static final String INTERVAL = "interval";
     }
 
+    public static UserTweetsListener getUserTweetsListener() {
+        return new UserTweetsListener();
+    }
+
+    public static TweetListener getTweetListener() {
+        return new TweetListener();
+    }
+
+    public static class UserTweetsListener implements UserStreamListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onDeletionNotice(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onFriendList(long[] longs) {
+            //do nothing
+        }
+
+        @Override
+        public void onFavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfavorite(User user, User user1, Status status) {
+            //do nothing
+        }
+
+        @Override
+        public void onFollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnfollow(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onDirectMessage(DirectMessage directMessage) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberAddition(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListMemberDeletion(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListSubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUnsubscription(User user, User user1, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListCreation(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListUpdate(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserListDeletion(User user, UserList userList) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserProfileUpdate(User user) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserSuspension(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onUserDeletion(long l) {
+            //do nothing
+        }
+
+        @Override
+        public void onBlock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onUnblock(User user, User user1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStatus(Status status) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(status);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
+            //do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int i) {
+            //do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long l, long l1) {
+            //do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning stallWarning) {
+            //do nothing
+        }
+
+        @Override
+        public void onException(Exception e) {
+            //do nothing
+        }
+    }
+
+    public static class TweetListener implements StatusListener {
+
+        private LinkedBlockingQueue<String> inputQ;
+
+        public void setInputQ(LinkedBlockingQueue<String> inputQ) {
+            this.inputQ = inputQ;
+        }
+
+        @Override
+        public void onStatus(Status tweet) {
+            String jsonTweet = TwitterObjectFactory.getRawJSON(tweet);
+            inputQ.add(jsonTweet);
+        }
+
+        @Override
+        public void onException(Exception arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onDeletionNotice(StatusDeletionNotice arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onScrubGeo(long arg0, long arg1) {
+            // do nothing
+        }
+
+        @Override
+        public void onStallWarning(StallWarning arg0) {
+            // do nothing
+        }
+
+        @Override
+        public void onTrackLimitationNotice(int arg0) {
+            // do nothing
+        }
+    }
+
 }

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 4
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Steven Jacobs (Code Review)" <do...@asterixdb.incubator.apache.org>.
Steven Jacobs has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2: Code-Review+2

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: No

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Xikui Wang has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

(1 comment)

Yes. The user of twitter feed needs to apply for Twitter API using certain twitter account. That's the account they are monitoring. :)

https://asterix-gerrit.ics.uci.edu/#/c/1272/2/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
File asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java:

Line 119:                             ? new TwitterPushRecordReader(TwitterUtil.getTwitterStream(configuration),
> I'm guessing this doesn't get closed until the feed is dropped?
Yes. Until the feed is stopped or dropped.


-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>
Gerrit-HasComments: Yes

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Xikui Wang (Code Review)" <do...@asterixdb.incubator.apache.org>.
Hello Steven Jacobs, Jenkins,

I'd like you to reexamine a change.  Please visit

    https://asterix-gerrit.ics.uci.edu/1272

to look at the new patch set (#3).

Change subject: Add user-stream for Twitter Adaptor
......................................................................

Add user-stream for Twitter Adaptor

1. Add user-stream option for Twitter Adaptor
2. Refactor part of TwitterRecordReaderFactory
3. To create a user-stream feed, using following ddl:
  create feed TwitterFeed using twitter_user_stream(
      ("format"="twitter-status"),
      ("type-name"="Tweet"),
      ...
   // rest is same as push feed

Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
---
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterPushRecordReader.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/twitter/TwitterRecordReaderFactory.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/TwitterUtil.java
5 files changed, 251 insertions(+), 76 deletions(-)


  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/72/1272/3
-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: newpatchset
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 3
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Steven Jacobs <sj...@ucr.edu>
Gerrit-Reviewer: Till Westmann <ti...@apache.org>
Gerrit-Reviewer: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: abdullah alamoudi <ba...@gmail.com>

Change in asterixdb[master]: Add user-stream for Twitter Adaptor

Posted by "Jenkins (Code Review)" <do...@asterixdb.incubator.apache.org>.
Jenkins has posted comments on this change.

Change subject: Add user-stream for Twitter Adaptor
......................................................................


Patch Set 2:

Build Started https://asterix-jenkins.ics.uci.edu/job/asterix-gerrit-notopic/2994/

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/1272
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: I99cdd4cb667306d378317616f9811dfce3e6d838
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Xikui Wang <xk...@gmail.com>
Gerrit-Reviewer: Jenkins <je...@fulliautomatix.ics.uci.edu>
Gerrit-HasComments: No