You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/11 18:29:38 UTC

[GitHub] merlimat closed pull request #3298: Added filtering to Twitter Firehose Connector

merlimat closed pull request #3298: Added filtering to Twitter Firehose Connector
URL: https://github.com/apache/pulsar/pull/3298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/twitter/pom.xml b/pulsar-io/twitter/pom.xml
index cfa3cf6dc1..48ea6526c2 100644
--- a/pulsar-io/twitter/pom.xml
+++ b/pulsar-io/twitter/pom.xml
@@ -53,6 +53,18 @@
       <artifactId>hbc-core</artifactId>
       <version>${hbc-core.version}</version>
     </dependency>
+    
+    <dependency>
+      <groupId>commons-collections</groupId>
+      <artifactId>commons-collections</artifactId>
+      <version>${commons.collections.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.4</version>
+    </dependency>
 
   </dependencies>
 
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index e1d0545d93..1219e0b19c 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -24,7 +24,7 @@
 import com.twitter.hbc.ClientBuilder;
 import com.twitter.hbc.common.DelimitedStreamReader;
 import com.twitter.hbc.core.Constants;
-import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StatusesFilterEndpoint;
 import com.twitter.hbc.core.endpoint.StreamingEndpoint;
 import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
 import com.twitter.hbc.httpclient.BasicClient;
@@ -33,23 +33,24 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Record;
+
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.twitter.data.TweetData;
+import org.apache.pulsar.io.twitter.data.TwitterRecord;
+import org.apache.pulsar.io.twitter.endpoint.SampleStatusesEndpoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Simple Push based Twitter FireHose Source
+ * Simple Push based Twitter FireHose Source.
  */
 @Connector(
     name = "twitter",
@@ -62,22 +63,16 @@
 
     private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class);
 
-    // ----- Fields set by the constructor
-
     // ----- Runtime fields
     private Object waitObject;
 
-    private final ObjectMapper mapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    private final ObjectMapper mapper = new ObjectMapper().configure(
+            DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException {
         TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
-        if (hoseConfig.getConsumerKey() == null
-                || hoseConfig.getConsumerSecret() == null
-                || hoseConfig.getToken() == null
-                || hoseConfig.getTokenSecret() == null) {
-            throw new IllegalArgumentException("Required property not set.");
-        }
+        hoseConfig.validate();
         waitObject = new Object();
         startThread(hoseConfig);
     }
@@ -87,40 +82,13 @@ public void close() throws Exception {
         stopThread();
     }
 
-    // ------ Custom endpoints
-
-    /**
-     * Implementing this interface allows users of this source to set a custom endpoint.
-     */
-    public interface EndpointInitializer {
-        StreamingEndpoint createEndpoint();
-    }
-
-    /**
-     * Required for Twitter Client
-     */
-    private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable {
-        @Override
-        public StreamingEndpoint createEndpoint() {
-            // this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets)
-            StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
-            endpoint.stallWarnings(false);
-            endpoint.delimited(false);
-            return endpoint;
-        }
-    }
-
     private void startThread(TwitterFireHoseConfig config) {
-        Authentication auth = new OAuth1(config.getConsumerKey(),
-                config.getConsumerSecret(),
-                config.getToken(),
-                config.getTokenSecret());
 
         BasicClient client = new ClientBuilder()
                 .name(config.getClientName())
                 .hosts(config.getClientHosts())
-                .endpoint(new SampleStatusesEndpoint().createEndpoint())
-                .authentication(auth)
+                .endpoint(getEndpoint(config))
+                .authentication(getAuthentication(config))
                 .processor(new HosebirdMessageProcessor() {
                     public DelimitedStreamReader reader;
 
@@ -176,42 +144,31 @@ private void stopThread() {
         }
     }
 
-    static private class TwitterRecord implements Record<TweetData> {
-        private final TweetData tweet;
-        private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
-        private final boolean guestimateTweetTime;
+    private Authentication getAuthentication(TwitterFireHoseConfig config) {
+        return new OAuth1(config.getConsumerKey(),
+                config.getConsumerSecret(),
+                config.getToken(),
+                config.getTokenSecret());
+    }
 
-        public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
-            this.tweet = tweet;
-            this.guestimateTweetTime = guestimateTweetTime;
-        }
+    private StreamingEndpoint getEndpoint(TwitterFireHoseConfig config) {
+        List<Long> followings = config.getFollowings();
+        List<String> terms = config.getTrackTerms();
 
-        @Override
-        public Optional<String> getKey() {
-            // TODO: Could use user or tweet ID as key here
-            return Optional.empty();
-        }
+        if (CollectionUtils.isEmpty(followings) && CollectionUtils.isEmpty(terms)) {
+            return new SampleStatusesEndpoint().createEndpoint();
+        } else {
+            StatusesFilterEndpoint hosebirdEndpoint = new StatusesFilterEndpoint();
 
-        @Override
-        public Optional<Long> getEventTime() {
-            try {
-                if (tweet.getCreatedAt() != null) {
-                    Date d = dateFormat.parse(tweet.getCreatedAt());
-                    return Optional.of(d.toInstant().toEpochMilli());
-                } else if (guestimateTweetTime) {
-                    return Optional.of(System.currentTimeMillis());
-                } else {
-                    return Optional.empty();
-                }
-            } catch (Exception e) {
-                return Optional.empty();
+            if (CollectionUtils.isNotEmpty(followings)) {
+               hosebirdEndpoint.followings(followings);
             }
-        }
 
-        @Override
-        public TweetData getValue() {
-            return tweet;
+            if (CollectionUtils.isNotEmpty(terms)) {
+               hosebirdEndpoint.trackTerms(terms);
+            }
+
+            return hosebirdEndpoint;
         }
     }
-
 }
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
index c9bf4dfd44..3d361afeec 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfig.java
@@ -21,16 +21,31 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.collect.Lists;
+import com.twitter.hbc.core.Constants;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
-import com.twitter.hbc.core.Constants;
-import lombok.*;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import lombok.experimental.Accessors;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
+/**
+ * Configuration object for the Twitter Firehose Connector.
+ */
 @Data
 @Setter
 @Getter
@@ -44,34 +59,42 @@
     @FieldDoc(
         required = true,
         defaultValue = "",
-        help = "Your twitter app consumer key. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
+        help = "Your twitter app consumer key. See "
+                + "https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
     )
     private String consumerKey;
+
     @FieldDoc(
         required = true,
         defaultValue = "",
-        help = "Your twitter app consumer secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
+        help = "Your twitter app consumer secret. "
+                + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
     )
     private String consumerSecret;
+
     @FieldDoc(
         required = true,
         defaultValue = "",
-        help = "Your twitter app token. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
+        help = "Your twitter app token. "
+                + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
     )
     private String token;
+
     @FieldDoc(
         required = true,
         defaultValue = "",
-        help = "Your twitter app token secret. See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
+        help = "Your twitter app token secret. "
+                + "See https://developer.twitter.com/en/docs/basics/authentication/guides/access-tokens for details"
     )
     private String tokenSecret;
+
     // Most firehose events have null createdAt time. If this parameter is set to true
     // then we estimate the createdTime of each firehose event to be current time.
     @FieldDoc(
         required = false,
         defaultValue = "false",
-        help = "Most firehose events have null createdAt time."
-            + " If this parameter is set to true, the connector estimates the createdTime of each firehose event to be current time."
+        help = "Most firehose events have null createdAt time.If this parameter is set to true, "
+                + "the connector estimates the createdTime of each firehose event to be current time."
     )
     private Boolean guestimateTweetTime = false;
 
@@ -83,12 +106,14 @@
         help = "The Twitter Firehose Client name"
     )
     private String clientName = "pulsario-twitter-source";
+
     @FieldDoc(
         required = false,
         defaultValue = Constants.STREAM_HOST,
         help = "The Twitter Firehose stream hosts that the connector connects to"
     )
     private String clientHosts = Constants.STREAM_HOST;
+
     @FieldDoc(
         required = false,
         defaultValue = "50000",
@@ -96,6 +121,20 @@
     )
     private int clientBufferSize = 50000;
 
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "A comma separated list of user IDs, indicating the users to return statuses for in the stream."
+    )
+    private String followings;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "",
+        help = "Keywords to track. Phrases of keywords are specified by a comma-separated list."
+    )
+    private String terms;
+
     public static TwitterFireHoseConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), TwitterFireHoseConfig.class);
@@ -105,4 +144,37 @@ public static TwitterFireHoseConfig load(Map<String, Object> map) throws IOExcep
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), TwitterFireHoseConfig.class);
     }
+
+    public void validate() throws IllegalArgumentException {
+        if (getConsumerKey() == null || getConsumerSecret() == null
+             || getToken() == null || getTokenSecret() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+    }
+
+    public List<Long> getFollowings() {
+        if (StringUtils.isBlank(followings)) {
+            return Collections.emptyList();
+        }
+
+        List<Long> result = new ArrayList<Long> ();
+
+        for (String s: StringUtils.split(followings, ",")) {
+            try {
+                result.add(Long.parseLong(StringUtils.trim(s)));
+            } catch (NumberFormatException nfEx) {
+                // Ignore these
+            }
+        }
+
+        return CollectionUtils.isEmpty(result) ? Collections.emptyList() : result;
+    }
+
+    public List<String> getTrackTerms() {
+        if (StringUtils.isBlank(terms)) {
+            return Collections.emptyList();
+        }
+
+        return Lists.newArrayList(StringUtils.split(terms, ","));
+    }
 }
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
similarity index 91%
rename from pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
rename to pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
index e5cb79c9a0..c06f3798e2 100644
--- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TweetData.java
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TweetData.java
@@ -17,11 +17,13 @@
  * under the License.
  */
 
-package org.apache.pulsar.io.twitter;
+package org.apache.pulsar.io.twitter.data;
 
-import java.util.List;
 import lombok.Data;
 
+/**
+ * POJO for Tweet object.
+ */
 @Data
 public class TweetData {
     private String createdAt;
@@ -44,7 +46,9 @@
     private String timestampMs;
     private Delete delete;
 
-
+    /**
+     * POJO for Twitter User object.
+     */
     @Data
     public static class User {
         private Long id;
@@ -54,7 +58,7 @@
         private String location;
         private String description;
         private String translatorType;
-        private Boolean _protected;
+        private Boolean protectedUser;
         private Boolean verified;
         private Long followersCount;
         private Long friendsCount;
@@ -81,6 +85,10 @@
         private Boolean defaultProfile;
         private Boolean defaultProfileImage;
     }
+
+    /**
+     * POJO for Re-Tweet object.
+     */
     @Data
     public static class RetweetedStatus {
         private String createdAt;
@@ -100,6 +108,10 @@
         private String filterLevel;
         private String lang;
     }
+
+    /**
+     * POJO for Tweet Status object.
+     */
     @Data
     public static class Status {
         private Long id;
@@ -107,6 +119,10 @@
         private Long userId;
         private String userIdStr;
     }
+
+    /**
+     * POJO for Tweet Delete object.
+     */
     @Data
     public static class Delete {
         private Status status;
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
new file mode 100644
index 0000000000..3a9f968ac5
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/TwitterRecord.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter.data;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Optional;
+
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * Twitter Record object.
+ */
+public class TwitterRecord implements Record<TweetData> {
+    private final TweetData tweet;
+    private static SimpleDateFormat dateFormat = new SimpleDateFormat("EEE MMM d HH:mm:ss Z yyyy");
+    private final boolean guestimateTweetTime;
+
+    public TwitterRecord(TweetData tweet, boolean guestimateTweetTime) {
+        this.tweet = tweet;
+        this.guestimateTweetTime = guestimateTweetTime;
+    }
+
+    @Override
+    public Optional<String> getKey() {
+        // TODO: Could use user or tweet ID as key here
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> getEventTime() {
+        try {
+            if (tweet.getCreatedAt() != null) {
+                Date d = dateFormat.parse(tweet.getCreatedAt());
+                return Optional.of(d.toInstant().toEpochMilli());
+            } else if (guestimateTweetTime) {
+                return Optional.of(System.currentTimeMillis());
+            } else {
+                return Optional.empty();
+            }
+        } catch (Exception e) {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public TweetData getValue() {
+        return tweet;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
new file mode 100644
index 0000000000..d7f96490ef
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/data/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter.data;
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
new file mode 100644
index 0000000000..85564fd59b
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/SampleStatusesEndpoint.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter.endpoint;
+
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+
+import java.io.Serializable;
+
+/**
+ * Required for Twitter Client.
+ */
+public class SampleStatusesEndpoint implements Serializable {
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+
+    public StreamingEndpoint createEndpoint() {
+        // Returns the sample endpoint: Returning a sample from the firehose (all tweets)
+        StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+        endpoint.stallWarnings(false);
+        endpoint.delimited(false);
+        return endpoint;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
new file mode 100644
index 0000000000..6e99d7714b
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/endpoint/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter.endpoint;
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
new file mode 100644
index 0000000000..1690a64ebf
--- /dev/null
+++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter;
\ No newline at end of file
diff --git a/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java
new file mode 100644
index 0000000000..689d81d18e
--- /dev/null
+++ b/pulsar-io/twitter/src/test/java/org/apache/pulsar/io/twitter/TwitterFireHoseConfigTests.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.twitter;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.testng.annotations.Test;
+
+public class TwitterFireHoseConfigTests {
+    
+    private TwitterFireHoseConfig config;
+
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sourceConfig.yaml");
+        config = TwitterFireHoseConfig.load(yamlFile.getAbsolutePath());
+        assertNotNull(config);
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("consumerKey", "xxx");
+        map.put("consumerSecret", "xxx");
+        map.put("token", "xxx");
+        map.put("tokenSecret", "xxx");
+        
+        config = TwitterFireHoseConfig.load(map);
+        
+        assertNotNull(config);
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("consumerKey", "xxx");
+        map.put("consumerSecret", "xxx");
+        map.put("token", "xxx");
+        map.put("tokenSecret", "xxx");
+        
+        config = TwitterFireHoseConfig.load(map);
+        config.validate();
+    }
+    
+    @Test(expectedExceptions = IllegalArgumentException.class, 
+            expectedExceptionsMessageRegExp = "Required property not set.")
+    public final void missingConsumerKeyValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        
+        config = TwitterFireHoseConfig.load(map);
+        config.validate();
+    }
+    
+    @Test
+    public final void getFollowingsTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("followings", "123, 456, 789");
+        config = TwitterFireHoseConfig.load(map);
+        
+        List<Long> followings = config.getFollowings();
+        assertNotNull(followings);
+        assertEquals(followings.size(), 3);
+        assertTrue(followings.contains(123L));
+        assertTrue(followings.contains(456L));
+        assertTrue(followings.contains(789L));
+    }
+    
+    
+    @Test
+    public final void getTermsTest() throws IOException {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("terms", "mickey, donald, goofy");
+        config = TwitterFireHoseConfig.load(map);
+        
+        List<String> terms = config.getTrackTerms();
+        assertNotNull(terms);
+        assertEquals(terms.size(), 3);
+        assertTrue(terms.contains("mickey"));
+    }
+    
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+
+}
diff --git a/pulsar-io/twitter/src/test/resources/sourceConfig.yaml b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
new file mode 100644
index 0000000000..9ac5708e37
--- /dev/null
+++ b/pulsar-io/twitter/src/test/resources/sourceConfig.yaml
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"consumerKey": "",
+"consumerSecret": ""
+}
\ No newline at end of file


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services