You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 17:45:20 UTC
[34/47] git commit: STREAMS 121 | Added instagram provider and tests
STREAMS 121 | Added instagram provider and tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/77603934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/77603934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/77603934
Branch: refs/heads/STREAMS-46
Commit: 77603934e41ac78ba68f73b3f80ea6852f0acb46
Parents: 815ce2a
Author: rebanks <re...@w2odigital.com>
Authored: Fri Jul 11 16:09:59 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Fri Jul 11 16:09:59 2014 -0500
----------------------------------------------------------------------
streams-contrib/pom.xml | 1 +
.../streams-provider-instagram/pom.xml | 6 +
.../instagram/InstagramConfigurator.java | 15 +-
.../provider/InstagramRecentMediaCollector.java | 120 ++++++
.../provider/InstagramRecentMediaProvider.java | 97 +++++
.../provider/InstagramTimelineProvider.java | 409 -------------------
.../com/instagram/InstagramConfiguration.json | 13 +-
.../InstagramUserInformationConfiguration.json | 2 +-
.../InstagramRecentMediaCollectorTest.java | 161 ++++++++
.../InstagramRecentMediaProviderTest.java | 145 +++++++
.../test/InstagramActivitySerDeTest.java | 92 -----
11 files changed, 544 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 620f68e..b06c276 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -58,6 +58,7 @@
<module>streams-provider-sysomos</module>
<module>streams-provider-rss</module>
<module>streams-processor-regex</module>
+ <module>streams-provider-instagram</module>
</modules>
<dependencyManagement>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml
index e356a7a..a96ff1c 100644
--- a/streams-contrib/streams-provider-instagram/pom.xml
+++ b/streams-contrib/streams-provider-instagram/pom.xml
@@ -76,6 +76,12 @@
<version>1.3</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
index f771856..4d01605 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -25,9 +25,6 @@ import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.validation.Validation;
-import javax.validation.Validator;
-import javax.validation.ValidatorFactory;
import java.io.IOException;
/**
@@ -41,8 +38,8 @@ public class InstagramConfigurator {
public static InstagramConfiguration detectInstagramConfiguration(Config config) {
- ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
- Validator validator = factory.getValidator();
+// ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+// Validator validator = factory.getValidator();
InstagramConfiguration instagramConfiguration = null;
try {
@@ -52,15 +49,15 @@ public class InstagramConfigurator {
}
Preconditions.checkNotNull(instagramConfiguration);
- Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+// Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
return instagramConfiguration;
}
public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) {
- ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
- Validator validator = factory.getValidator();
+// ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+// Validator validator = factory.getValidator();
InstagramUserInformationConfiguration instagramConfiguration = null;
try {
@@ -70,7 +67,7 @@ public class InstagramConfigurator {
}
Preconditions.checkNotNull(instagramConfiguration);
- Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+// Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
return instagramConfiguration;
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
new file mode 100644
index 0000000..7eb3fcd
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -0,0 +1,120 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ *
+ */
+public class InstagramRecentMediaCollector implements Runnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
+ protected static final int MAX_ATTEMPTS = 5;
+ protected static final int SLEEP_SECS = 5; //5 seconds
+
+ protected Queue dataQueue; //exposed for testing
+ private InstagramUserInformationConfiguration config;
+ private Instagram instagramClient;
+ private volatile boolean isCompleted;
+
+
+ public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration config) {
+ this.dataQueue = queue;
+ this.config = config;
+ this.instagramClient = new Instagram(this.config.getClientId());
+ this.isCompleted = false;
+ }
+
+ protected void setInstagramClient(Instagram instagramClient) {
+ this.instagramClient = instagramClient;
+ }
+
+ protected Set<Long> getUserIds() {
+ Set<Long> userIds = Sets.newHashSet();
+ for(String id : config.getUserIds()) {
+ try {
+ userIds.add(Long.parseLong(id));
+ } catch (NumberFormatException nfe) {
+ LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
+ }
+ }
+ return userIds;
+ }
+
+ protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException {
+ LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
+ if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
+ long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000;
+ try {
+ LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ } else {
+ LOGGER.error("Instagram returned an excetpion to the user media request : {}", instaExec.getMessage());
+ throw instaExec;
+ }
+ }
+
+ private void getUserMedia(Long userId) {
+ MediaFeed feed = null;
+ int attempts = 0;
+ int count = 0;
+ do {
+ ++attempts;
+ try {
+ feed = this.instagramClient.getRecentMediaFeed(userId);
+ queueData(feed, userId);
+ count += feed.getData().size();
+ while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage()) {
+ feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination());
+ queueData(feed, userId);
+ count += feed.getData().size();
+ }
+ } catch (InstagramException ie) {
+ try {
+ handleInstagramException(ie, attempts);
+ } catch (InstagramException ie2) { //not a rate limit exception, ignore user
+ attempts = MAX_ATTEMPTS;
+ }
+ }
+ } while(feed == null && attempts < MAX_ATTEMPTS);
+ LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count);
+ }
+
+ private void queueData(MediaFeed userFeed, Long userId) {
+ if(userFeed == null) {
+ LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
+ } else {
+ for(MediaFeedData data : userFeed.getData()) {
+ synchronized (this.dataQueue) {
+ while(!this.dataQueue.offer(data)) {
+ Thread.yield();
+ }
+ }
+ }
+ }
+ }
+
+ public boolean isCompleted() {
+ return this.isCompleted;
+ }
+
+ @Override
+ public void run() {
+ for(Long userId : getUserIds()) {
+ getUserMedia(userId);
+ }
+ this.isCompleted = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
new file mode 100644
index 0000000..3354e54
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -0,0 +1,97 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Queues;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rebanks on 7/9/14.
+ */
+public class InstagramRecentMediaProvider implements StreamsProvider {
+
+ private InstagramUserInformationConfiguration config;
+ private InstagramRecentMediaCollector dataCollector;
+ protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
+ private ExecutorService executorService;
+ private volatile boolean isCompleted;
+
+ public InstagramRecentMediaProvider() {
+ this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+ }
+
+ public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) {
+ this.config = config;
+ this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+ this.isCompleted = false;
+ }
+
+ @Override
+ public void startStream() {
+ this.dataCollector = getInstagramRecentMediaCollector();
+ this.executorService = Executors.newSingleThreadExecutor();
+ this.executorService.submit(this.dataCollector);
+ }
+
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
+ }
+
+
+ @Override
+ public StreamsResultSet readCurrent() {
+ Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+ MediaFeedData data = null;
+ synchronized (this.mediaFeedQueue) {
+ while(!this.mediaFeedQueue.isEmpty()) {
+ data = this.mediaFeedQueue.poll();
+ batch.add(new StreamsDatum(data, data.getId()));
+ }
+ }
+ this.isCompleted = batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted();
+ return new StreamsResultSet(batch);
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !this.isCompleted;
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+
+ }
+
+ @Override
+ public void cleanUp() {
+ this.executorService.shutdown();
+ try {
+ this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } finally {
+ this.executorService = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
deleted file mode 100644
index d3e7179..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.provider;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class InstagramTimelineProvider implements StreamsProvider, Serializable {
-
- public final static String STREAMS_ID = "InstagramTimelineProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
- public static final int MAX_NUMBER_WAITING = 10000;
-
- private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
- private InstagramUserInformationConfiguration config;
-
- private Class klass;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
- public InstagramUserInformationConfiguration getConfig() {
- return config;
- }
-
- public void setConfig(InstagramUserInformationConfiguration config) {
- this.config = config;
- }
-
- protected Iterator<Long[]> idsBatches;
- protected Iterator<String[]> screenNameBatches;
-
- protected volatile Queue<StreamsDatum> providerQueue;
-
- protected int idsCount;
- protected Instagram client;
-
-
- protected ExecutorService executor;
-
- protected DateTime start;
- protected DateTime end;
-
- protected final AtomicBoolean running = new AtomicBoolean();
-
- private static ExecutorService getExecutor() {
- return Executors.newSingleThreadExecutor();
- }
-
- public InstagramTimelineProvider() {
- Config config = StreamsConfigurator.config.getConfig("instagram");
- this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
- }
-
- public InstagramTimelineProvider(InstagramUserInformationConfiguration config) {
- this.config = config;
- }
-
- public InstagramTimelineProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("instagram");
- this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
- this.klass = klass;
- }
-
- public InstagramTimelineProvider(InstagramUserInformationConfiguration config, Class klass) {
- this.config = config;
- this.klass = klass;
- }
-
-
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
- }
-
- @Override
- public void startStream() {
- LOGGER.debug("{} startStream", STREAMS_ID);
-
- Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
- LOGGER.info("readCurrent");
-
- while(idsBatches.hasNext())
- loadBatch(idsBatches.next());
-
- while(screenNameBatches.hasNext())
- loadBatch(screenNameBatches.next());
-
- executor.shutdown();
- }
-
- private void loadBatch(Long[] ids) {
-
- // twitter4j implementation below - replace with jInstagram
-
-// Twitter client = getTwitterClient();
-// int keepTrying = 0;
-//
-// // keep trying to load, give it 5 attempts.
-// //while (keepTrying < 10)
-// while (keepTrying < 1)
-// {
-// try
-// {
-// long[] toQuery = new long[ids.length];
-// for(int i = 0; i < ids.length; i++)
-// toQuery[i] = ids[i];
-//
-// for (User tStat : client.lookupUsers(toQuery)) {
-//
-// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-// executor.submit(providerTask);
-//
-// }
-// keepTrying = 10;
-// }
-// catch(TwitterException twitterException) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-// }
-// catch(Exception e) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-// }
-// }
- }
-
- private void loadBatch(String[] ids) {
-
- // twitter4j implementation below - replace with jInstagram
-//
-// Twitter client = getTwitterClient();
-// int keepTrying = 0;
-//
-// // keep trying to load, give it 5 attempts.
-// //while (keepTrying < 10)
-// while (keepTrying < 1)
-// {
-// try
-// {
-// for (User tStat : client.lookupUsers(ids)) {
-//
-// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-// executor.submit(providerTask);
-//
-// }
-// keepTrying = 10;
-// }
-// catch(TwitterException twitterException) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-// }
-// catch(Exception e) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-// }
-// }
- }
-
- public class InstagramTimelineProviderTask implements Runnable {
-
- // twitter4j implementation below - replace with jInstagram
-
- private final Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
-
- private InstagramTimelineProvider provider;
- private Instagram client;
- private Long id;
-
- public InstagramTimelineProviderTask(InstagramTimelineProvider provider, Instagram client, Long id) {
- this.provider = provider;
- this.client = client;
- this.id = id;
- }
-
- @Override
- public void run() {
-
- // twitter4j implementation below - replace with jInstagram
-
-// Paging paging = new Paging(1, 200);
-// List<Status> statuses = null;
-// boolean KeepGoing = true;
-// boolean hadFailure = false;
-//
-// do
-// {
-// int keepTrying = 0;
-//
-// // keep trying to load, give it 5 attempts.
-// //This value was chosen because it seemed like a reasonable number of times
-// //to retry capturing a timeline given the sorts of errors that could potentially
-// //occur (network timeout/interruption, faulty client, etc.)
-// while (keepTrying < 5)
-// {
-//
-// try
-// {
-// statuses = client.getUserTimeline(id, paging);
-//
-// for (Status tStat : statuses)
-// {
-// String json = TwitterObjectFactory.getRawJSON(tStat);
-//
-// try {
-// provider.lock.readLock().lock();
-// ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
-// } finally {
-// provider.lock.readLock().unlock();
-// }
-// }
-//
-// paging.setPage(paging.getPage() + 1);
-//
-// keepTrying = 10;
-// }
-// catch(TwitterException twitterException) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-// }
-// catch(Exception e) {
-// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-// }
-// }
-// }
-// while (provider.shouldContinuePulling(statuses));
-
- LOGGER.info(id + " Thread Finished");
-
- }
-
- }
-
- private Map<Long, Long> userPullInfo;
-
- protected boolean shouldContinuePulling(List<MediaFeedData> statuses) {
- return (statuses != null) && (statuses.size() > 0);
- }
-
- private void sleep()
- {
- Thread.yield();
- try {
- // wait one tenth of a millisecond
- Thread.yield();
- Thread.sleep(1);
- Thread.yield();
- }
- catch(IllegalArgumentException e) {
- // passing in static values, this will never happen
- }
- catch(InterruptedException e) {
- // noOp, there must have been an issue sleeping
- }
- Thread.yield();
- }
-
- public StreamsResultSet readCurrent() {
-
- LOGGER.info("Providing {} docs", providerQueue.size());
-
- StreamsResultSet result;
-
- try {
- lock.writeLock().lock();
- result = new StreamsResultSet(providerQueue);
- result.setCounter(new DatumStatusCounter());
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- if( providerQueue.isEmpty() && executor.isTerminated()) {
- LOGGER.info("Finished. Cleaning up...");
-
- running.set(false);
-
- LOGGER.info("Exiting");
- }
-
- return result;
-
- }
-
- protected Queue<StreamsDatum> constructQueue() {
- return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
- }
-
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
-
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- throw new NotImplementedException();
- }
-
- @Override
- public boolean isRunning() {
- return running.get();
- }
-
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
-
- @Override
- public void prepare(Object o) {
-
- executor = getExecutor();
- running.set(true);
- try {
- lock.writeLock().lock();
- providerQueue = constructQueue();
- } finally {
- lock.writeLock().unlock();
- }
-
- Preconditions.checkNotNull(providerQueue);
-
- Preconditions.checkNotNull(this.klass);
- Preconditions.checkNotNull(config.getAccessToken());
-
- //idsCount = config.getFollow().size();
-
- client = getInstagramClient();
- }
-
- protected Instagram getInstagramClient()
- {
- // twitter4j -> jInstagram
-// String baseUrl = "https://api.instagram.com:443/1.1/";
-//
-// ConfigurationBuilder builder = new ConfigurationBuilder()
-// .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-// .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-// .setOAuthAccessToken(config.getOauth().getAccessToken())
-// .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-// .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-// .setJSONStoreEnabled(jsonStoreEnabled)
-// .setAsyncNumThreads(3)
-// .setRestBaseURL(baseUrl)
-// .setIncludeMyRetweetEnabled(Boolean.TRUE)
-// .setPrettyDebugEnabled(Boolean.TRUE);
-//
-// return new InstagramFactory(builder.build()).getInstance();
- return null;
- }
-
- @Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index 18a59b9..f8f8117 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,16 +5,17 @@
"javaType" : "org.apache.streams.instagram.InstagramConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
- "version": {
+ "clientId": {
"type": "string",
- "description": "The version"
+ "description": "Your Instagram Client Id"
},
- "endpoint": {
+ "clientSecret": {
"type": "string",
- "description": "The endpoint"
+ "description": "Your Instagram Client secret"
},
- "accessToken": {
- "type": "string"
+ "callbackUrl": {
+ "type": "string",
+ "description": "Your Instagream callback url"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
index 4b75ee4..b6a8c6b 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
@@ -6,7 +6,7 @@
"extends": {"$ref":"InstagramConfiguration.json"},
"javaInterfaces": ["java.io.Serializable"],
"properties": {
- "info": {
+ "userIds": {
"type": "array",
"description": "A list of user IDs, indicating the users whose posts should be delivered on the stream",
"items": {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
new file mode 100644
index 0000000..39f607c
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -0,0 +1,161 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
+ */
+public class InstagramRecentMediaCollectorTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
+
+ private int expectedDataCount = 0;
+ private long randomSeed = System.currentTimeMillis();
+ private Random rand = new Random(randomSeed);
+ private Map<Pagination, MediaFeed> pageMap = Maps.newHashMap();
+
+ @Test
+ public void testHandleInstagramException1() throws InstagramException {
+ InstagramException ie = mock(InstagramException.class);
+ when(ie.getRemainingLimitStatus()).thenReturn(1);
+ final String message = "Test Message";
+ when(ie.getMessage()).thenReturn(message);
+ InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
+ try {
+ collector.handleInstagramException(ie, 1);
+ fail("Expected RuntimeException to be thrown");
+ } catch (InstagramException rte) {
+// assertTrue(rte.getMessage().contains("Mock for InstagramException"));
+ assertEquals(message, rte.getMessage());
+ }
+ }
+
+ @Test
+ public void testHandleInstagramException2() throws InstagramException{
+ InstagramException ie = mock(InstagramException.class);
+ when(ie.getRemainingLimitStatus()).thenReturn(0);
+ InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
+ long startTime = System.currentTimeMillis();
+ collector.handleInstagramException(ie, 1);
+ long endTime = System.currentTimeMillis();
+ LOGGER.debug("Slept for {} ms", startTime - endTime);
+ assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error
+ startTime = System.currentTimeMillis();
+ collector.handleInstagramException(ie, 2);
+ endTime = System.currentTimeMillis();
+ LOGGER.debug("Slept for {} ms", startTime - endTime);
+ assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error
+ }
+
+ @Test
+ public void testGetUserIds() {
+ InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
+ List<String> userIds = Lists.newLinkedList();
+ userIds.add("1");
+ userIds.add("2");
+ userIds.add("3");
+ userIds.add("4");
+ userIds.add("abcdefg");
+ config.setUserIds(userIds);
+ InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), config);
+
+ Set<Long> expected = Sets.newHashSet();
+ expected.add(1L);
+ expected.add(2L);
+ expected.add(3L);
+ expected.add(4L);
+
+ assertEquals(expected, collector.getUserIds());
+ }
+
+ @Test
+ public void testRun() {
+ Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+ InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
+ List<String> userIds = Lists.newLinkedList();
+ userIds.add("1");
+ userIds.add("2");
+ userIds.add("3");
+ userIds.add("4");
+ config.setUserIds(userIds);
+ InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config);
+ collector.setInstagramClient(createMockInstagramClient());
+ collector.run();
+ LOGGER.debug("Random seed == {}", randomSeed);
+ assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size());
+ }
+
+ private Instagram createMockInstagramClient() {
+ final Instagram instagramClient = mock(Instagram.class);
+ try {
+ final InstagramException mockException = mock(InstagramException.class);
+ when(mockException.getRemainingLimitStatus()).thenReturn(-1);
+ when(mockException.getMessage()).thenReturn("MockInstagramException message");
+ when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer<MediaFeed>() {
+ @Override
+ public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+ long param = (Long) invocationOnMock.getArguments()[0];
+ if (param == 2L) {
+ throw mockException;
+ } else {
+ return createRandomMockMediaFeed();
+ }
+ }
+ });
+ when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
+ @Override
+ public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+ return createRandomMockMediaFeed();
+ }
+ });
+ } catch (InstagramException ie) {
+ fail("Failed to create mock instagram client.");
+ }
+ return instagramClient;
+ }
+
+ private MediaFeed createRandomMockMediaFeed() throws InstagramException {
+ MediaFeed feed = mock(MediaFeed.class);
+ when(feed.getData()).thenReturn(createData(this.rand.nextInt(100)));
+ Pagination pagination = mock(Pagination.class);
+ if(this.rand.nextInt(2) == 0) {
+ when(pagination.hasNextPage()).thenReturn(true);
+ } else {
+ when(pagination.hasNextPage()).thenReturn(false);
+ }
+ when(feed.getPagination()).thenReturn(pagination);
+ return feed;
+ }
+
+ private List<MediaFeedData> createData(int size) {
+ List<MediaFeedData> data = Lists.newLinkedList();
+ for(int i=0; i < size; ++i) {
+ data.add(mock(MediaFeedData.class));
+ }
+ this.expectedDataCount += size;
+ return data;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
new file mode 100644
index 0000000..7d2a47b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
@@ -0,0 +1,145 @@
+package org.apache.streams.instagram.provider;
+
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ *
+ */
+public class InstagramRecentMediaProviderTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
+
+ @Test
+ public void testStartStream() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()) {
+
+ private volatile boolean isFinished = false;
+
+ @Override
+ public void run() {
+ this.isFinished = true;
+ latch.countDown();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return this.isFinished;
+ }
+ };
+
+ InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
+ @Override
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ return collectorStub;
+ }
+ };
+
+ provider.startStream();
+
+ latch.await();
+ assertTrue(collectorStub.isCompleted());
+ StreamsResultSet result = provider.readCurrent();
+ assertNotNull(result);
+ assertEquals(0, result.size());
+ assertTrue(!provider.isRunning());
+ try {
+ provider.cleanUp();
+ } catch (Throwable throwable){
+ throwable.printStackTrace();
+ fail("Error durring clean up");
+ }
+ }
+
+ @Test
+ public void testReadCurrent() {
+ final long seed = System.nanoTime();
+ final Random rand = new Random(seed);
+ final CyclicBarrier test = new CyclicBarrier(2);
+ final CyclicBarrier produce = new CyclicBarrier(2);
+ final AtomicInteger batchCount = new AtomicInteger(0);
+ InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(new InstagramUserInformationConfiguration()) {
+ @Override
+ protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ return new InstagramRecentMediaCollector(super.mediaFeedQueue, new InstagramUserInformationConfiguration()) {
+
+ private volatile boolean isFinished = false;
+
+
+
+ public int getBatchCount() {
+ return batchCount.get();
+ }
+
+ @Override
+ public boolean isCompleted() {
+ return isFinished;
+ }
+
+ @Override
+ public void run() {
+ int randInt = rand.nextInt(5);
+ while(randInt != 0) {
+ int batchSize = rand.nextInt(200);
+ for(int i=0; i < batchSize; ++i) {
+ while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+ Thread.yield();
+ }
+ }
+ batchCount.set(batchSize);
+ try {
+ test.await();
+ produce.await();
+ } catch (InterruptedException ie ) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+ randInt = rand.nextInt(5);
+ }
+ batchCount.set(0);
+ isFinished = true;
+ try {
+ test.await();
+ produce.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ };
+ }
+ };
+ provider.startStream();
+ while(provider.isRunning()) {
+ try {
+ test.await();
+ assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
+ produce.await();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (BrokenBarrierException bbe) {
+ Thread.currentThread().interrupt();
+ }
+
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
deleted file mode 100644
index fcf5e81..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class InstagramActivitySerDeTest {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
- private InstagramJsonActivitySerializer instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
-
- // remove @Ignore after implementation
- @Ignore
- @Test
- public void Tests()
- {
- InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/test.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- // convert to MediaFeedData?
- Activity activity = instagramJsonActivitySerializer.deserialize(line);
-
- String activitystring = mapper.writeValueAsString(activity);
-
- LOGGER.info("activity: {}", activitystring);
-
- assertThat(activity, is(not(nullValue())));
-
- assertThat(activity.getId(), is(not(nullValue())));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getProvider(), is(not(nullValue())));
-
- }
- }
- } catch( Exception e ) {
- System.out.println(e);
- e.printStackTrace();
- Assert.fail();
- }
- }
-}