You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/04/19 19:49:49 UTC
[2/3] incubator-streams git commit: STREAMS-496: Remove twitter4j
dependency from streams-provider-twitter
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
new file mode 100644
index 0000000..d6e30e4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListRequest;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFollowersListProviderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Twitter client;
+ protected TwitterFollowingProvider provider;
+ protected FollowersListRequest request;
+
+ private int count = 0;
+
+ /**
+ * TwitterFollowersListProviderTask constructor.
+ * @param provider TwitterFollowingProvider
+ * @param twitter Twitter
+ * @param request FollowersListRequest
+ */
+ public TwitterFollowersListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FollowersListRequest request) {
+ this.provider = provider;
+ this.client = twitter;
+ this.request = request;
+ }
+
+ @Override
+ public void run() {
+
+ LOGGER.info("Thread Starting: {}", request.toString());
+
+ getFollowersList(request);
+
+ LOGGER.info("Thread Finished: {}", request.toString());
+
+ }
+
+ int last_count = 0;
+ int page_count = 1;
+ int item_count = 0;
+ long cursor = 0;
+
+ private void getFollowersList(FollowersListRequest request) {
+
+ do {
+
+ FollowersListResponse response = client.list(request);
+
+ last_count = response.getUsers().size();
+
+ if (response.getUsers().size() > 0) {
+
+ for (User follower : response.getUsers()) {
+
+ Follow follow = new Follow()
+ .withFollowee(
+ new User()
+ .withId(request.getId())
+ .withScreenName(request.getScreenName()))
+ .withFollower(follower);
+
+ if (item_count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ item_count++;
+ }
+
+ }
+
+ }
+ page_count++;
+ cursor = response.getNextCursor();
+ request.setCursor(cursor);
+
+ }
+ while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+ }
+
+ public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+ return (
+ cursor > 0
+ && count > 0
+ && item_count < provider.getConfig().getMaxItems()
+ && page_count <= provider.getConfig().getMaxPages());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 16b6c03..a2be967 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -26,20 +26,26 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.FollowersIdsRequest;
+import org.apache.streams.twitter.api.FollowingIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.Twitter;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -49,13 +55,18 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Retrieve all follow adjacencies from a list of user ids or names.
*/
-public class TwitterFollowingProvider extends TwitterUserInformationProvider {
+public class TwitterFollowingProvider {
public static final String STREAMS_ID = "TwitterFollowingProvider";
private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
@@ -64,8 +75,19 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
private TwitterFollowingConfiguration config;
+ protected List<String> names = new ArrayList<>();
+ protected List<Long> ids = new ArrayList<>();
+
+ protected Twitter client;
+
+ protected ListeningExecutorService executor;
+
private List<ListenableFuture<Object>> futures = new ArrayList<>();
+ protected final AtomicBoolean running = new AtomicBoolean();
+
+ protected volatile Queue<StreamsDatum> providerQueue;
+
/**
* To use from command line:
*
@@ -139,67 +161,137 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
- super(config);
this.config = config;
}
- @Override
public void prepare(Object configurationObject) {
- super.prepare(config);
+
+ Objects.requireNonNull(config);
+ Objects.requireNonNull(config.getOauth());
+ Objects.requireNonNull(config.getOauth().getConsumerKey());
+ Objects.requireNonNull(config.getOauth().getConsumerSecret());
+ Objects.requireNonNull(config.getOauth().getAccessToken());
+ Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
+ Objects.requireNonNull(config.getInfo());
+ Objects.requireNonNull(config.getThreadsPerProvider());
+
+ try {
+ client = getTwitterClient();
+ } catch (InstantiationException e) {
+ LOGGER.error("InstantiationException", e);
+ }
+
+ Objects.requireNonNull(client);
+
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ Objects.requireNonNull(providerQueue);
+
+ // abstract this out
+ for (String s : config.getInfo()) {
+ if (s != null) {
+ String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ ids.add(Long.parseLong(potentialScreenName));
+ } catch (Exception ex) {
+ names.add(potentialScreenName);
+ }
+ }
+ }
+
Objects.requireNonNull(getConfig().getEndpoint());
+
+ executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
+
Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+
+ if( config.getEndpoint().equals("friends")) {
+ submitFriendsThreads(ids, names);
+ } else if( config.getEndpoint().equals("followers")) {
+ submitFollowersThreads(ids, names);
+ }
}
- @Override
public void startStream() {
Objects.requireNonNull(executor);
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
LOGGER.info("startStream");
running.set(true);
- while (idsBatches.hasNext()) {
- submitFollowingThreads(idsBatches.next());
- }
- while (screenNameBatches.hasNext()) {
- submitFollowingThreads(screenNameBatches.next());
- }
-
executor.shutdown();
}
- protected void submitFollowingThreads(Long[] ids) {
- Twitter client = getTwitterClient();
+ protected void submitFollowersThreads(List<Long> ids, List<String> names) {
+
+ for (Long id : ids) {
+ TwitterFollowersIdsProviderTask providerTask =
+ new TwitterFollowersIdsProviderTask(
+ this,
+ client,
+ (FollowersIdsRequest)new FollowersIdsRequest().withId(id));
+
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
+ }
+
+ for (String name : names) {
+ TwitterFollowersIdsProviderTask providerTask =
+ new TwitterFollowersIdsProviderTask(
+ this,
+ client,
+ (FollowersIdsRequest)new FollowersIdsRequest().withScreenName(name));
- for (int i = 0; i < ids.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
ListenableFuture future = executor.submit(providerTask);
futures.add(future);
- LOGGER.info("submitted {}", ids[i]);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
}
}
- protected void submitFollowingThreads(String[] screenNames) {
- Twitter client = getTwitterClient();
+ protected void submitFriendsThreads(List<Long> ids, List<String> names) {
+
+ for (Long id : ids) {
+ TwitterFriendsIdsProviderTask providerTask =
+ new TwitterFriendsIdsProviderTask(
+ this,
+ client,
+ (FriendsIdsRequest)new FriendsIdsRequest().withId(id));
+
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
+ }
+
+ for (String name : names) {
+ TwitterFriendsIdsProviderTask providerTask =
+ new TwitterFriendsIdsProviderTask(
+ this,
+ client,
+ (FriendsIdsRequest)new FriendsIdsRequest().withScreenName(name));
- for (int i = 0; i < screenNames.length; i++) {
- TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
ListenableFuture future = executor.submit(providerTask);
futures.add(future);
- LOGGER.info("submitted {}", screenNames[i]);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
}
+ }
+ protected Twitter getTwitterClient() throws InstantiationException {
+ return Twitter.getInstance(config);
}
- @Override
public StreamsResultSet readCurrent() {
- LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
-
StreamsResultSet result;
try {
@@ -207,7 +299,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
- LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ LOGGER.debug("readCurrent: {} Documents", result.size());
} finally {
lock.writeLock().unlock();
}
@@ -216,17 +308,45 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
}
- public boolean shouldContinuePulling(List<twitter4j.User> users) {
+ public boolean shouldContinuePulling(List<User> users) {
return (users != null) && (users.size() == config.getPageSize());
}
- @Override
public boolean isRunning() {
- if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
- LOGGER.info("Completed");
+ if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+ LOGGER.info("All Threads Completed");
running.set(false);
LOGGER.info("Exiting");
}
return running.get();
}
+
+ // abstract this out
+ protected Queue<StreamsDatum> constructQueue() {
+ return new LinkedBlockingQueue<>();
+ }
+
+ // abstract this out
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
deleted file mode 100644
index 313416a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.util.ComponentUtils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.PagableResponseList;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterObjectFactory;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Retrieve friend or follower connections for a single user id.
- */
-public class TwitterFollowingProviderTask implements Runnable {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
-
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- protected TwitterFollowingProvider provider;
- private Twitter client;
- protected Long id;
- private String screenName;
-
- private int count = 0;
-
- /**
- * TwitterFollowingProviderTask constructor.
- * @param provider TwitterFollowingProvider
- * @param twitter Twitter
- * @param id numeric id
- */
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
- this.provider = provider;
- this.client = twitter;
- this.id = id;
- }
-
- /**
- * TwitterFollowingProviderTask constructor.
- * @param provider TwitterFollowingProvider
- * @param twitter Twitter
- * @param screenName screenName
- */
- public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
- this.provider = provider;
- this.client = twitter;
- this.screenName = screenName;
- }
-
- int page_count = 0;
- int item_count = 0;
-
- @Override
- public void run() {
-
- Preconditions.checkArgument(id != null || screenName != null);
-
- if ( id != null ) {
- getFollowing(id);
- } else {
- getFollowing(screenName);
- }
-
- LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
-
- }
-
- private void getFollowing(Long id) {
-
- Preconditions.checkArgument(
- provider.getConfig().getEndpoint().equals("friends")
- || provider.getConfig().getEndpoint().equals("followers")
- );
-
- if ( provider.getConfig().getIdsOnly() ) {
- collectIds(id);
- } else {
- collectUsers(id);
- }
- }
-
- private void getFollowing(String screenName) {
-
- twitter4j.User user = null;
- try {
- user = client.users().showUser(screenName);
- } catch (TwitterException ex) {
- LOGGER.error("Failure looking up " + id);
- }
- Objects.requireNonNull(user);
- getFollowing(user.getId());
- }
-
- private void collectUsers(Long id) {
- int keepTrying = 0;
- List<twitter4j.User> list = null;
- long curser = -1;
-
- twitter4j.User user;
- String userJson;
- try {
- user = client.users().showUser(id);
- userJson = TwitterObjectFactory.getRawJSON(user);
- } catch (TwitterException ex) {
- LOGGER.error("Failure looking up " + id);
- return;
- }
-
- do {
- try {
-
- PagableResponseList<twitter4j.User> page = null;
- if ( provider.getConfig().getEndpoint().equals("followers") ) {
- page = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getPageSize().intValue());
- } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- page = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getPageSize().intValue());
- }
-
- Objects.requireNonNull(list);
- Preconditions.checkArgument(list.size() > 0);
-
- for (twitter4j.User other : list) {
-
- String otherJson = TwitterObjectFactory.getRawJSON(other);
-
- try {
- Follow follow = null;
- if ( provider.getConfig().getEndpoint().equals("followers") ) {
- follow = new Follow()
- .withFollowee(mapper.readValue(userJson, User.class))
- .withFollower(mapper.readValue(otherJson, User.class));
- } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- follow = new Follow()
- .withFollowee(mapper.readValue(otherJson, User.class))
- .withFollower(mapper.readValue(userJson, User.class));
- }
-
- Objects.requireNonNull(follow);
-
- if ( item_count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- item_count++;
- }
-
- } catch (Exception ex) {
- LOGGER.warn("Exception: {}", ex);
- }
- }
- if ( !page.hasNext() ) {
- break;
- }
- if ( page.getNextCursor() == 0 ) {
- break;
- }
- curser = page.getNextCursor();
- page_count++;
- } catch (Exception ex) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
- }
- }
- while (provider.shouldContinuePulling(list) && curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
- }
-
- private void collectIds(Long id) {
- int keepTrying = 0;
-
- long curser = -1;
-
- twitter4j.User user;
- String userJson;
- try {
- user = client.users().showUser(id);
- userJson = TwitterObjectFactory.getRawJSON(user);
- } catch (TwitterException ex) {
- LOGGER.error("Failure looking up " + id);
- return;
- }
-
- do {
- try {
- twitter4j.IDs ids = null;
- if ( provider.getConfig().getEndpoint().equals("followers") ) {
- ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue());
- } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue());
- }
-
- Objects.requireNonNull(ids);
- Preconditions.checkArgument(ids.getIDs().length > 0);
-
- for (long otherId : ids.getIDs()) {
-
- try {
- Follow follow = null;
- if ( provider.getConfig().getEndpoint().equals("followers") ) {
- follow = new Follow()
- .withFollowee(new User().withId(id))
- .withFollower(new User().withId(otherId));
- } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
- follow = new Follow()
- .withFollowee(new User().withId(otherId))
- .withFollower(new User().withId(id));
- }
-
- Objects.requireNonNull(follow);
-
- if ( item_count < provider.getConfig().getMaxItems()) {
- ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
- item_count++;
- }
- } catch (Exception ex) {
- LOGGER.warn("Exception: {}", ex);
- }
- }
- if ( !ids.hasNext() ) {
- break;
- }
- if ( ids.getNextCursor() == 0 ) {
- break;
- }
- curser = ids.getNextCursor();
- page_count++;
- } catch (TwitterException twitterException) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
- } catch (Exception ex) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
- }
- }
- while (shouldContinuePulling() && curser != 0 && keepTrying < provider.getConfig().getRetryMax() );
- }
-
- public boolean shouldContinuePulling() {
- return ( item_count < provider.getConfig().getMaxItems()
- && page_count < provider.getConfig().getMaxPages());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
new file mode 100644
index 0000000..84fb789
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsIdsProviderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Twitter client;
+ protected TwitterFollowingProvider provider;
+ protected FriendsIdsRequest request;
+
+ private int count = 0;
+
+ /**
+ * TwitterFollowingProviderTask constructor.
+ * @param provider TwitterFollowingProvider
+ * @param twitter Twitter
+ * @param request FriendsIdsRequest
+ */
+ public TwitterFriendsIdsProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsIdsRequest request) {
+ this.provider = provider;
+ this.client = twitter;
+ this.request = request;
+ }
+
+ @Override
+ public void run() {
+
+ Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+ LOGGER.info("Thread Starting: {}", request.toString());
+
+ getFriendsIds(request);
+
+ LOGGER.info("Thread Finished: {}", request.toString());
+
+ }
+
+ int last_count = 0;
+ int page_count = 1;
+ int item_count = 0;
+ long cursor = 0;
+
+ private void getFriendsIds(FriendsIdsRequest request) {
+
+ do {
+
+ FriendsIdsResponse response = client.ids(request);
+
+ last_count = response.getIds().size();
+
+ if (response.getIds().size() > 0) {
+
+ for (Long id : response.getIds()) {
+
+ Follow follow = new Follow()
+ .withFollowee(
+ new User()
+ .withId(id))
+ .withFollower(
+ new User()
+ .withId(request.getId())
+ .withScreenName(request.getScreenName()));
+
+ if (item_count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ item_count++;
+ }
+
+ }
+
+ }
+ page_count++;
+ cursor = response.getNextCursor();
+ request.setCursor(cursor);
+
+ }
+ while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+ }
+
+ public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+ return (
+ cursor > 0
+ && count > 0
+ && item_count < provider.getConfig().getMaxItems()
+ && page_count <= provider.getConfig().getMaxPages());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
new file mode 100644
index 0000000..570705d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsListRequest;
+import org.apache.streams.twitter.api.FriendsListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsListProviderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
+
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ protected Twitter client;
+ protected TwitterFollowingProvider provider;
+ protected FriendsListRequest request;
+
+ private int count = 0;
+
+ /**
+ * TwitterFollowingProviderTask constructor.
+ * @param provider TwitterFollowingProvider
+ * @param twitter Twitter
+ * @param request FriendsListRequest
+ */
+ public TwitterFriendsListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsListRequest request) {
+ this.provider = provider;
+ this.client = twitter;
+ this.request = request;
+ }
+
+ int last_count = 0;
+ int page_count = 1;
+ int item_count = 0;
+ long cursor = 0;
+
+ @Override
+ public void run() {
+
+ Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+ LOGGER.info(request.getId() != null ? request.getId().toString() : request.getScreenName() + " Thread Finished");
+
+ }
+
+ private void getFriendsList(FriendsListRequest request) {
+
+ do {
+
+ FriendsListResponse response = client.list(request);
+
+ last_count = response.getUsers().size();
+
+ if (response.getUsers().size() > 0) {
+
+ for (User friend : response.getUsers()) {
+
+ Follow follow = new Follow()
+ .withFollower(friend)
+ .withFollowee(
+ new User()
+ .withId(request.getId())
+ .withScreenName(request.getScreenName()));
+
+ if (item_count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+ item_count++;
+ }
+
+ }
+
+ }
+ page_count++;
+ cursor = response.getNextCursor();
+ request.setCursor(cursor);
+
+ }
+ while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+ }
+
+ public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+ return (
+ cursor > 0
+ && count > 0
+ && item_count < provider.getConfig().getMaxItems()
+ && page_count <= provider.getConfig().getMaxPages());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f76fed..217b3d8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -27,7 +27,11 @@ import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterTimelineProviderConfiguration;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,12 +48,6 @@ import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
-import twitter4j.User;
-import twitter4j.conf.ConfigurationBuilder;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -92,8 +90,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
return config;
}
- protected Collection<String[]> screenNameBatches;
- protected Collection<Long> ids;
+ protected List<String> names = new ArrayList<>();
+ protected List<Long> ids = new ArrayList<>();
protected volatile Queue<StreamsDatum> providerQueue;
@@ -109,9 +107,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
private List<ListenableFuture<Object>> futures = new ArrayList<>();
- Boolean jsonStoreEnabled;
- Boolean includeEntitiesEnabled;
-
/**
* To use from command line:
*
@@ -207,14 +202,34 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
Objects.requireNonNull(config.getInfo());
+ Objects.requireNonNull(config.getThreadsPerProvider());
+
+ try {
+ client = getTwitterClient();
+ } catch (InstantiationException e) {
+ LOGGER.error("InstantiationException", e);
+ }
+
+ Objects.requireNonNull(client);
- consolidateToIDs();
+ for (String s : config.getInfo()) {
+ if (s != null) {
+ String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
- if (ids.size() > 1) {
- executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
- } else {
- executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+ // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+ // screen name list
+ try {
+ ids.add(Long.parseLong(potentialScreenName));
+ } catch (Exception ex) {
+ names.add(potentialScreenName);
+ }
+ }
}
+
+ executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), config.getInfo().size()));
+
+ submitTimelineThreads(ids, names);
+
}
@Override
@@ -222,42 +237,40 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
LOGGER.debug("{} startStream", STREAMS_ID);
- Preconditions.checkArgument(!ids.isEmpty());
-
running.set(true);
- submitTimelineThreads(ids.toArray(new Long[0]));
-
executor.shutdown();
}
- protected void submitTimelineThreads(Long[] ids) {
-
- Twitter client = getTwitterClient();
-
- for (int i = 0; i < ids.length; i++) {
-
- TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+ protected void submitTimelineThreads(List<Long> ids, List<String> names) {
+
+ for (Long id : ids) {
+ StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+ request.setUserId(id);
+ request.setCount(config.getPageSize());
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+ this,
+ client,
+ request
+ );
ListenableFuture future = executor.submit(providerTask);
futures.add(future);
- LOGGER.info("submitted {}", ids[i]);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
}
-
- }
-
- private Collection<Long> retrieveIds(String[] screenNames) {
- Twitter client = getTwitterClient();
-
- List<Long> ids = new ArrayList<>();
- try {
- for (User twitterUser : client.lookupUsers(screenNames)) {
- ids.add(twitterUser.getId());
- }
- } catch (TwitterException ex) {
- LOGGER.error("Failure retrieving user details.", ex.getMessage());
+ for (String name : names) {
+ StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+ request.setScreenName(name);
+ request.setCount(config.getPageSize());
+ TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+ this,
+ client,
+ request
+ );
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
}
- return ids;
}
@Override
@@ -302,66 +315,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
throw new NotImplementedException();
}
-
-
- /**
- * Using the "info" list that is contained in the configuration, ensure that all
- * account identifiers are converted to IDs (Longs) instead of screenNames (Strings).
- */
- protected void consolidateToIDs() {
- List<String> screenNames = new ArrayList<>();
- ids = new ArrayList<>();
-
- for ( String account : config.getInfo() ) {
- try {
- if ( new Long(account) != null ) {
- ids.add(Long.parseLong(Objects.toString(account, null)));
- }
- } catch ( NumberFormatException ex ) {
- screenNames.add(account);
- } catch ( Exception ex ) {
- LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex);
- }
- }
-
- // Twitter allows for batches up to 100 per request, but you cannot mix types
- screenNameBatches = new ArrayList<>();
- while ( screenNames.size() >= 100 ) {
- screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
- screenNames = screenNames.subList(100, screenNames.size());
- }
-
- if (screenNames.size() > 0) {
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- }
-
- for ( String[] screenNameBatche : screenNameBatches ) {
- Collection<Long> batchIds = retrieveIds(screenNameBatche);
- ids.addAll(batchIds);
- }
- }
-
/**
* get Twitter Client from TwitterUserInformationConfiguration.
* @return result
*/
- public Twitter getTwitterClient() {
-
- String baseUrl = TwitterProviderUtil.baseUrl(config);
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(true)
- .setJSONStoreEnabled(true)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl)
- .setIncludeMyRetweetEnabled(Boolean.TRUE)
- .setPrettyDebugEnabled(Boolean.TRUE);
-
- return new TwitterFactory(builder.build()).getInstance();
+ public Twitter getTwitterClient() throws InstantiationException {
+
+ return Twitter.getInstance(config);
+
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 8cb2b46..ffb90b7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -20,17 +20,15 @@ package org.apache.streams.twitter.provider;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.ResponseList;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterObjectFactory;
import java.util.List;
import java.util.stream.Collectors;
@@ -47,81 +45,64 @@ public class TwitterTimelineProviderTask implements Runnable {
protected TwitterTimelineProvider provider;
protected Twitter client;
- protected Long id;
+ protected StatusesUserTimelineRequest request;
/**
* TwitterTimelineProviderTask constructor.
* @param provider TwitterTimelineProvider
* @param twitter Twitter
- * @param id Long
+ * @param request StatusesUserTimelineRequest
*/
- public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
+ public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, StatusesUserTimelineRequest request) {
this.provider = provider;
this.client = twitter;
- this.id = id;
+ this.request = request;
}
- int page_count = 1;
int item_count = 0;
- List<Status> lastPage = null;
+ int last_count = 0;
+ int page_count = 1;
@Override
public void run() {
- Paging paging = new Paging(page_count, provider.getConfig().getPageSize().intValue());
-
- LOGGER.info(id + " Thread Starting");
+ LOGGER.info("Thread Starting: {}", request.toString());
do {
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //This value was chosen because it seemed like a reasonable number of times
- //to retry capturing a timeline given the sorts of errors that could potentially
- //occur (network timeout/interruption, faulty client, etc.)
- while (keepTrying < 5) {
-
- try {
- this.client = provider.getTwitterClient();
- ResponseList<Status> statuses = client.getUserTimeline(id, paging);
+ List<Tweet> statuses = client.userTimeline(request);
- for (Status twitterStatus : statuses) {
+ last_count = statuses.size();
+ if( statuses.size() > 0 ) {
- String json = TwitterObjectFactory.getRawJSON(twitterStatus);
-
- if ( item_count < provider.getConfig().getMaxItems() ) {
- try {
- org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
- ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
- } catch (Exception exception) {
- LOGGER.warn("Failed to read document as Tweet ", twitterStatus);
- }
- item_count++;
- }
+ for (Tweet status : statuses) {
+ if (item_count < provider.getConfig().getMaxItems()) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+ item_count++;
}
- lastPage = statuses;
- page_count = paging.getPage() + 1;
- paging.setPage(page_count);
-
- keepTrying = 10;
- } catch (Exception ex) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, id, ex);
}
+
+ Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
+ long minId = statusIds.reduce(Math::min).get();
+ page_count++;
+ request.setMaxId(minId);
+
}
+
}
- while (shouldContinuePulling());
+ while (shouldContinuePulling(last_count, page_count, item_count));
- LOGGER.info(id + " Thread Finished");
+ LOGGER.info("Thread Finished: {}", request.toString());
}
- public boolean shouldContinuePulling() {
- return (lastPage != null)
- && item_count < provider.getConfig().getMaxItems()
- && page_count <= provider.getConfig().getMaxPages();
+ public boolean shouldContinuePulling(int count, int page_count, int item_count) {
+ return (
+ count > 0
+ && item_count < provider.getConfig().getMaxItems()
+ && page_count <= provider.getConfig().getMaxPages());
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 214d204..1a7b906 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -28,6 +28,8 @@ import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.twitter.TwitterFollowingConfiguration;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
import org.apache.streams.twitter.pojo.User;
import org.apache.streams.util.ComponentUtils;
@@ -35,6 +37,8 @@ import org.apache.streams.util.ComponentUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
@@ -45,10 +49,6 @@ import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
-import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
import java.io.BufferedOutputStream;
import java.io.File;
@@ -88,6 +88,32 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
private TwitterUserInformationConfiguration config;
+ protected List<String> names = new ArrayList<>();
+ protected List<Long> ids = new ArrayList<>();
+
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ protected volatile Queue<StreamsDatum> providerQueue;
+
+ public TwitterUserInformationConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(TwitterUserInformationConfiguration config) {
+ this.config = config;
+ }
+
+ protected Twitter client;
+
+ protected ListeningExecutorService executor;
+
+ protected DateTime start;
+ protected DateTime end;
+
+ protected final AtomicBoolean running = new AtomicBoolean();
+
+ private List<ListenableFuture<Object>> futures = new ArrayList<>();
+
/**
* To use from command line:
*
@@ -148,28 +174,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
outStream.flush();
}
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- protected volatile Queue<StreamsDatum> providerQueue;
-
- public TwitterUserInformationConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(TwitterUserInformationConfiguration config) {
- this.config = config;
- }
-
- protected Iterator<Long[]> idsBatches;
- protected Iterator<String[]> screenNameBatches;
-
- protected ListeningExecutorService executor;
-
- protected DateTime start;
- protected DateTime end;
-
- protected final AtomicBoolean running = new AtomicBoolean();
-
// TODO: this should be abstracted out
public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
return new ThreadPoolExecutor(numThreads, numThreads,
@@ -212,6 +216,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
Objects.requireNonNull(config.getOauth().getAccessToken());
Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
Objects.requireNonNull(config.getInfo());
+ Objects.requireNonNull(config.getThreadsPerProvider());
+
+ try {
+ client = getTwitterClient();
+ } catch (InstantiationException e) {
+ LOGGER.error("InstantiationException", e);
+ }
+
+ Objects.requireNonNull(client);
try {
lock.writeLock().lock();
@@ -222,12 +235,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
Objects.requireNonNull(providerQueue);
- List<String> screenNames = new ArrayList<>();
- List<String[]> screenNameBatches = new ArrayList<>();
-
- List<Long> ids = new ArrayList<>();
- List<Long[]> idsBatches = new ArrayList<>();
-
for (String s : config.getInfo()) {
if (s != null) {
String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -237,46 +244,67 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
try {
ids.add(Long.parseLong(potentialScreenName));
} catch (Exception ex) {
- screenNames.add(potentialScreenName);
- }
-
- // Twitter allows for batches up to 100 per request, but you cannot mix types
-
- if (ids.size() >= 100) {
- // add the batch
- idsBatches.add(ids.toArray(new Long[ids.size()]));
- // reset the Ids
- ids = new ArrayList<>();
- }
-
- if (screenNames.size() >= 100) {
- // add the batch
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- // reset the Ids
- screenNames = new ArrayList<>();
+ names.add(potentialScreenName);
}
}
}
+ executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
- if (ids.size() > 0) {
- idsBatches.add(ids.toArray(new Long[ids.size()]));
- }
+ Objects.requireNonNull(executor);
- if (screenNames.size() > 0) {
- screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
- }
+ // Twitter allows for batches up to 100 per request, but you cannot mix types
+ submitUserInformationThreads(ids, names);
+ }
- if (ids.size() + screenNames.size() > 0) {
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
- } else {
- executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+ protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
+
+ int idsIndex = 0;
+ while( idsIndex + 100 < ids.size() ) {
+ List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
+ TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+ this,
+ client,
+ new UsersLookupRequest().withUserId(batchIds));
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
+ idsIndex += 100;
+ }
+ if (ids.size() >= idsIndex) {
+ List<Long> batchIds = ids.subList(idsIndex, ids.size());
+ TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+ this,
+ client,
+ new UsersLookupRequest().withUserId(batchIds));
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
}
- Objects.requireNonNull(executor);
+ int namesIndex = 0;
+ while( idsIndex + 100 < ids.size() ) {
+ List<String> batchNames = names.subList(namesIndex, namesIndex + 100);
+ TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+ this,
+ client,
+ new UsersLookupRequest().withScreenName(batchNames));
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
+ namesIndex += 100;
+ }
+ if (names.size() >= idsIndex) {
+ List<Long> batchNames = ids.subList(idsIndex, names.size());
+ TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+ this,
+ client,
+ new UsersLookupRequest().withUserId(batchNames));
+ ListenableFuture future = executor.submit(providerTask);
+ futures.add(future);
+ LOGGER.info("Thread Submitted: {}", providerTask.request);
+ }
- this.idsBatches = idsBatches.iterator();
- this.screenNameBatches = screenNameBatches.iterator();
}
@Override
@@ -284,82 +312,16 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
Objects.requireNonNull(executor);
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
- LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
-
- while (idsBatches.hasNext()) {
- loadBatch(idsBatches.next());
- }
-
- while (screenNameBatches.hasNext()) {
- loadBatch(screenNameBatches.next());
- }
+ LOGGER.info("startStream: {} Threads", futures.size());
running.set(true);
executor.shutdown();
}
- protected void loadBatch(Long[] ids) {
- Twitter client = getTwitterClient();
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1) {
- try {
- long[] toQuery = new long[ids.length];
-
- for (int i = 0; i < ids.length; i++) {
- toQuery[i] = ids[i];
- }
-
- for (twitter4j.User twitterUser : client.lookupUsers(toQuery)) {
- String json = DataObjectFactory.getRawJSON(twitterUser);
- try {
- User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
- ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
- } catch (Exception exception) {
- LOGGER.warn("Failed to read document as User ", twitterUser);
- }
- }
- keepTrying = 10;
- } catch (Exception ex) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
- }
- }
- }
-
- protected void loadBatch(String[] ids) {
- Twitter client = getTwitterClient();
- int keepTrying = 0;
-
- // keep trying to load, give it 5 attempts.
- //while (keepTrying < 10)
- while (keepTrying < 1) {
- try {
- for (twitter4j.User twitterUser : client.lookupUsers(ids)) {
- String json = DataObjectFactory.getRawJSON(twitterUser);
- try {
- User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
- ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
- } catch (Exception exception) {
- LOGGER.warn("Failed to read document as User ", twitterUser);
- }
- }
- keepTrying = 10;
- } catch (Exception ex) {
- keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
- }
- }
- }
-
@Override
public StreamsResultSet readCurrent() {
- LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
-
StreamsResultSet result;
try {
@@ -367,7 +329,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
result = new StreamsResultSet(providerQueue);
result.setCounter(new DatumStatusCounter());
providerQueue = constructQueue();
- LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+ LOGGER.debug("readCurrent: {} Documents", result.size());
} finally {
lock.writeLock().unlock();
}
@@ -394,17 +356,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
return (StreamsResultSet)providerQueue.iterator();
}
+
@Override
public boolean isRunning() {
-
- if ( providerQueue.isEmpty() && executor.isTerminated() ) {
- LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
+ if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+ LOGGER.info("All Threads Completed");
running.set(false);
-
LOGGER.info("Exiting");
}
-
return running.get();
}
@@ -427,29 +386,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
}
}
-
- // TODO: abstract out, also appears in TwitterTimelineProvider
- protected Twitter getTwitterClient() {
- String baseUrl = TwitterProviderUtil.baseUrl(config);
-
- ConfigurationBuilder builder = new ConfigurationBuilder()
- .setOAuthConsumerKey(config.getOauth().getConsumerKey())
- .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
- .setOAuthAccessToken(config.getOauth().getAccessToken())
- .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
- .setIncludeEntitiesEnabled(true)
- .setJSONStoreEnabled(true)
- .setAsyncNumThreads(3)
- .setRestBaseURL(baseUrl)
- .setIncludeMyRetweetEnabled(Boolean.TRUE)
- .setPrettyDebugEnabled(Boolean.TRUE);
-
- return new TwitterFactory(builder.build()).getInstance();
- }
-
- protected void callback() {
-
-
+ protected Twitter getTwitterClient() throws InstantiationException {
+ return Twitter.getInstance(config);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
new file mode 100644
index 0000000..5dbb784
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Retrieve recent posts for a single user id.
+ */
+public class TwitterUserInformationProviderTask implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
+
+ private static ObjectMapper MAPPER = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
+
+ protected TwitterUserInformationProvider provider;
+ protected Twitter client;
+ protected UsersLookupRequest request;
+
+ /**
+ * TwitterTimelineProviderTask constructor.
+ * @param provider TwitterUserInformationProvider
+ * @param twitter Twitter
+ * @param request UsersLookupRequest
+ */
+ public TwitterUserInformationProviderTask(TwitterUserInformationProvider provider, Twitter twitter, UsersLookupRequest request) {
+ this.provider = provider;
+ this.client = twitter;
+ this.request = request;
+ }
+
+ @Override
+ public void run() {
+
+ LOGGER.info("Thread Starting: {}", request.toString());
+
+ List<User> users = client.lookup(request);
+
+ for (User user : users) {
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+ }
+
+ LOGGER.info("Thread Finished: {}", request.toString());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
deleted file mode 100644
index 69048d1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
+++ /dev/null
@@ -1,87 +0,0 @@
-{
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "protocol": {
- "type": "string",
- "description": "The protocol",
- "default": "https"
- },
- "host": {
- "type": "string",
- "description": "The host",
- "default": "api.twitter.com"
- },
- "port": {
- "type": "integer",
- "description": "The port",
- "default": 443
- },
- "version": {
- "type": "string",
- "description": "The version",
- "default": "1.1"
- },
- "endpoint": {
- "type": "string",
- "description": "The endpoint"
- },
- "jsonStoreEnabled": {
- "default" : true,
- "type": "string"
- },
- "oauth": {
- "type": "object",
- "dynamic": "true",
- "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "appName": {
- "type": "string"
- },
- "consumerKey": {
- "type": "string"
- },
- "consumerSecret": {
- "type": "string"
- },
- "accessToken": {
- "type": "string"
- },
- "accessTokenSecret": {
- "type": "string"
- }
- }
- },
- "basicauth": {
- "type": "object",
- "dynamic": "true",
- "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "username": {
- "type": "string"
- },
- "password": {
- "type": "string"
- }
- }
- },
- "retrySleepMs": {
- "type": "integer",
- "description": "ms to sleep when hitting a rate limit",
- "default": 100000
- },
- "retryMax": {
- "type": "integer",
- "description": "ms to sleep when hitting a rate limit",
- "default": 10
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
deleted file mode 100644
index 89fc7af..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.TwitterFollowingConfiguration",
- "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "ids_only": {
- "type": "boolean",
- "description": "Whether to collect ids only, or full profiles",
- "default": "true"
- },
- "max_items": {
- "type": "integer",
- "description": "Max items per user to collect",
- "default": 50000
- },
- "max_pages": {
- "type": "integer",
- "description": "Max pages per user to request",
- "default": 10
- },
- "page_size": {
- "type": "integer",
- "description": "Max items per page to request",
- "default": 5000
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
deleted file mode 100644
index 6fa2a73..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
- "extends": {"$ref":"TwitterConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "includeEntities": {
- "type": "string"
- },
- "truncated": {
- "type": "boolean"
- },
- "filter-level": {
- "type": "string",
- "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
- },
- "with": {
- "type": "string",
- "description": "Typically following or user"
- },
- "replies": {
- "type": "string",
- "description": "Set to all, to see all @replies"
- },
- "follow": {
- "type": "array",
- "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
- "items": {
- "type": "integer"
- }
- },
- "track": {
- "type": "array",
- "description": "A list of phrases which will be used to determine what Tweets will be delivered on the stream",
- "items": {
- "type": "string"
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
deleted file mode 100644
index 37ed60e..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
- "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "max_items": {
- "type": "integer",
- "description": "Max items per user to collect",
- "default": 3200
- },
- "max_pages": {
- "type": "integer",
- "description": "Max items per page to request",
- "default": 16
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
deleted file mode 100644
index 405c87a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
- "$schema": "http://json-schema.org/draft-03/schema",
- "$license": [
- "http://www.apache.org/licenses/LICENSE-2.0"
- ],
- "id": "#",
- "type": "object",
- "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration",
- "extends": {"$ref":"TwitterConfiguration.json"},
- "javaInterfaces": ["java.io.Serializable"],
- "properties": {
- "info": {
- "type": "array",
- "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
- "items": {
- "type": "string"
- }
- },
- "page_size": {
- "type": "integer",
- "description": "Max items per page to request",
- "default": 200
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
new file mode 100644
index 0000000..2b98689
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.twitter.api.FollowersIdsRequest",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+ "extends": { "$ref": "FollowingIdsRequest.json" },
+ "properties": {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
new file mode 100644
index 0000000..03dd83c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
@@ -0,0 +1,32 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType": "org.apache.streams.twitter.api.FollowersIdsResponse",
+ "javaInterfaces": [
+ "java.io.Serializable"
+ ],
+ "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+ "properties": {
+ "ids": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "previous_cursor": {
+ "type": "integer"
+ },
+ "previous_cursor_str": {
+ "type": "string"
+ },
+ "next_cursor": {
+ "type": "integer"
+ },
+ "next_cursor_str": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
new file mode 100644
index 0000000..c588579
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.twitter.api.FollowersListRequest",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+ "extends": { "$ref": "FollowingListRequest.json" },
+ "properties": {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
new file mode 100644
index 0000000..e8e46a1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
@@ -0,0 +1,33 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType": "org.apache.streams.twitter.api.FollowersListResponse",
+ "javaInterfaces": [
+ "java.io.Serializable"
+ ],
+ "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+ "properties": {
+ "users": {
+ "type": "array",
+ "items": {
+ "type": "object",
+ "$ref": "../pojo/User.json"
+ }
+ },
+ "previous_cursor": {
+ "type": "integer"
+ },
+ "previous_cursor_str": {
+ "type": "string"
+ },
+ "next_cursor": {
+ "type": "integer"
+ },
+ "next_cursor_str": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
new file mode 100644
index 0000000..81804fa
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
@@ -0,0 +1,36 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType" : "org.apache.streams.twitter.api.FollowingIdsRequest",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "id": {
+ "description": "The ID of the user for whom to return results for.",
+ "required": false,
+ "type": "integer"
+ },
+ "screen_name": {
+ "description": "The screen name of the user for whom to return results for.",
+ "required": false,
+ "type": "string"
+ },
+ "cursor": {
+ "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+ "required": false,
+ "type": "integer"
+ },
+ "stringify_ids": {
+ "description": "Many programming environments will not consume our Tweet ids due to their size. Provide this option to have ids returned as strings instead.",
+ "required": false,
+ "type": "boolean"
+ },
+ "count": {
+ "description": "Specifies the number of IDs attempt retrieval of, up to a maximum of 5,000 per distinct request. The value of count is best thought of as a limit to the number of results to return. When using the count parameter with this method, it is wise to use a consistent count value across all requests to the same user\u2019s collection. Usage of this parameter is encouraged in environments where all 5,000 IDs constitutes too large of a response.",
+ "required": false,
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
new file mode 100644
index 0000000..90295b5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
@@ -0,0 +1,41 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType" : "org.apache.streams.twitter.api.FollowingListRequest",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "id": {
+ "description": "The ID of the user for whom to return results for.",
+ "required": false,
+ "type": "integer"
+ },
+ "screen_name": {
+ "description": "The screen name of the user for whom to return results for.",
+ "required": false,
+ "type": "string"
+ },
+ "cursor": {
+ "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+ "required": false,
+ "type": "integer"
+ },
+ "count": {
+ "description": "The number of users to return per page, up to a maximum of 200. Defaults to 20.",
+ "required": false,
+ "type": "integer"
+ },
+ "skip_status": {
+ "description": "When set to either true , t or 1 , statuses will not be included in the returned user objects. If set to any other value, statuses will be included.",
+ "required": false,
+ "type": "boolean"
+ },
+ "include_user_entities": {
+ "description": "The user object entities node will not be included when set to false.",
+ "required": false,
+ "type": "integer"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
new file mode 100644
index 0000000..fda54f8
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
@@ -0,0 +1,13 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.twitter.api.FriendsIdsRequest",
+ "javaInterfaces": ["java.io.Serializable"],
+ "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+ "extends": { "$ref": "FollowingIdsRequest.json" },
+ "properties": {}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
new file mode 100644
index 0000000..fc46841
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
@@ -0,0 +1,32 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "javaType": "org.apache.streams.twitter.api.FriendsIdsResponse",
+ "javaInterfaces": [
+ "java.io.Serializable"
+ ],
+ "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+ "properties": {
+ "ids": {
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "previous_cursor": {
+ "type": "integer"
+ },
+ "previous_cursor_str": {
+ "type": "string"
+ },
+ "next_cursor": {
+ "type": "integer"
+ },
+ "next_cursor_str": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file