You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/25 18:42:49 UTC

[2/8] git commit: STREAMS-143 | Refactored InstagramRecementMediaProvider and InstagramRecentMediaCollector to us new abstract classes.

STREAMS-143 | Refactored InstagramRecementMediaProvider and InstagramRecentMediaCollector to us new abstract classes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2a391aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2a391aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2a391aa

Branch: refs/heads/master
Commit: c2a391aa7f76557bf24e0c2591c1b2d2431f79b8
Parents: 0df06b6
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 13:18:15 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 13:18:15 2014 -0500

----------------------------------------------------------------------
 .../provider/InstagramAbstractProvider.java     |   4 +-
 .../provider/InstagramDataCollector.java        |  24 ++-
 .../InstagramRecentMediaCollector.java          |  83 ++--------
 .../InstagramRecentMediaProvider.java           | 153 +------------------
 .../InstagramRecentMediaCollectorTest.java      |   3 +-
 .../InstagramRecentMediaProviderTest.java       |  14 +-
 6 files changed, 48 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 3d35714..c34014c 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
 
-    private InstagramConfiguration config;
+    protected InstagramConfiguration config;
     private InstagramDataCollector dataCollector;
     protected Queue<StreamsDatum> dataQueue; //exposed for testing
     private ExecutorService executorService;
@@ -54,7 +54,7 @@ public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
     }
 
     public InstagramAbstractProvider(InstagramConfiguration config) {
-        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+        this.config = SerializationUtil.cloneBySerialization(config);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
index 4cfc282..2e3add6 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
@@ -82,6 +82,19 @@ public abstract class InstagramDataCollector<T> implements Runnable {
         return this.instagram;
     }
 
+    /**
+     * Return the number of available tokens for this data collector
+     * @return numbeer of available tokens
+     */
+    protected int numAvailableTokens() {
+        return this.tokenManger.numAvailableTokens();
+    }
+
+    /**
+     * Queues the Instagram data to be output by the provider.
+     * @param userData data to queue
+     * @param userId user id who the data came from
+     */
     protected void queueData(Collection<T> userData, String userId) {
         if (userData == null) {
             LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId);
@@ -101,12 +114,13 @@ public abstract class InstagramDataCollector<T> implements Runnable {
 
     @Override
     public void run() {
-        try {
-            for (User user : this.config.getUsersInfo().getUsers()) {
+        for (User user : this.config.getUsersInfo().getUsers()) {
+            try {
                 collectInstagramDataForUser(user);
+            } catch (Exception e) {
+                LOGGER.error("Exception thrown while polling for user, {}, skipping user.", user.getUserId());
+                LOGGER.error("Exception thrown while polling for user : ", e);
             }
-        } catch (Exception e) {
-            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
         }
         this.isCompleted.set(true);
     }
@@ -116,7 +130,7 @@ public abstract class InstagramDataCollector<T> implements Runnable {
      * @param user
      * @throws Exception
      */
-    protected abstract void collectInstagramDataForUser(User user);
+    protected abstract void collectInstagramDataForUser(User user) throws Exception;
 
     /**
      * Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
index 952e3bf..6bf71c3 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -15,15 +15,10 @@ under the License. */
 package org.apache.streams.instagram.provider.recentmedia;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.provider.InstagramOauthToken;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
-import org.jinstagram.Instagram;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
 import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
@@ -33,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Executes on all of the Instagram requests to collect the media feed data.
@@ -41,76 +35,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
  * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
  * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy.
  */
-public class InstagramRecentMediaCollector implements Runnable {
+public class InstagramRecentMediaCollector extends InstagramDataCollector<MediaFeedData> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
     protected static final int MAX_ATTEMPTS = 5;
-    protected static final int SLEEP_SECS = 5; //5 seconds
 
-    protected Queue<MediaFeedData> dataQueue; //exposed for testing
-    private InstagramConfiguration config;
-    private AtomicBoolean isCompleted;
-    private SimpleTokenManager<InstagramOauthToken> tokenManger;
     private int consecutiveErrorCount;
-    private BackOffStrategy backOffStrategy;
-    private Instagram instagram;
 
 
-    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
-        this.dataQueue = queue;
-        this.config = config;
-        this.isCompleted = new AtomicBoolean(false);
-        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
-        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
-            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
-        }
-        this.consecutiveErrorCount = 0;
-        this.backOffStrategy = new ExponentialBackOffStrategy(2);
-        this.instagram = new Instagram(this.config.getClientId());
-    }
-
-
-    /**
-     * If there are authorized tokens available, it sets a new token for the client and returns
-     * the client.  If there are no available tokens, it simply returns the client that was
-     * initialized in the constructor with client id.
-     * @return
-     */
-    @VisibleForTesting
-    protected Instagram getNextInstagramClient() {
-        if(this.tokenManger.numAvailableTokens() > 0) {
-            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
-        }
-        return this.instagram;
-    }
-
-    private void queueData(MediaFeed userFeed, String userId) {
-        if (userFeed == null) {
-            LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
-        } else {
-            for (MediaFeedData data : userFeed.getData()) {
-                ComponentUtils.offerUntilSuccess(data, this.dataQueue);
-            }
-        }
-    }
-
-    /**
-     * @return true when the collector has queued all of the available media feed data for the provided users.
-     */
-    public boolean isCompleted() {
-        return this.isCompleted.get();
+    public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) {
+        super(queue, config);
     }
 
     @Override
-    public void run() {
-        try {
-            for (User user : this.config.getUsersInfo().getUsers()) {
-                collectMediaFeed(user);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
-        }
-        this.isCompleted.set(true);
+    protected StreamsDatum convertToStreamsDatum(MediaFeedData item) {
+        return new StreamsDatum(item, item.getId());
     }
 
     /**
@@ -119,8 +58,8 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @param user
      * @throws Exception
      */
-    @VisibleForTesting
-    protected void collectMediaFeed(User user) throws Exception {
+    @Override
+    protected void collectInstagramDataForUser(User user) throws Exception {
         Pagination pagination = null;
         do {
             int attempts = 0;
@@ -151,7 +90,7 @@ public class InstagramRecentMediaCollector implements Runnable {
                         LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
                         ++this.consecutiveErrorCount;
                     }
-                    if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
+                    if(this.consecutiveErrorCount > Math.max(this.numAvailableTokens(), MAX_ATTEMPTS*2)) {
                         throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
                     }
                 }
@@ -159,7 +98,7 @@ public class InstagramRecentMediaCollector implements Runnable {
                     this.consecutiveErrorCount = 0;
                     this.backOffStrategy.reset();
                     pagination = feed.getPagination();
-                    queueData(feed, user.getUserId());
+                    queueData(feed.getData(), user.getUserId());
                 }
             }
             if(!succesfullDataPull) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
index 39acbd4..ad039a8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -22,6 +22,8 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.*;
+import org.apache.streams.instagram.provider.InstagramAbstractProvider;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
 import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
 import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
@@ -41,161 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
  */
-public class InstagramRecentMediaProvider implements StreamsProvider {
+public class InstagramRecentMediaProvider extends InstagramAbstractProvider {
 
-    private InstagramConfiguration config;
-    private InstagramRecentMediaCollector dataCollector;
-    protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
-    private ExecutorService executorService;
-    private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
-        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
     }
 
     public InstagramRecentMediaProvider(InstagramConfiguration config) {
-        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+        super(config);
     }
 
     @Override
-    public void startStream() {
-        this.dataCollector = getInstagramRecentMediaCollector();
-        this.executorService = Executors.newSingleThreadExecutor();
-        this.executorService.submit(this.dataCollector);
+    protected InstagramDataCollector getInstagramDataCollector() {
+        return new InstagramRecentMediaCollector(super.dataQueue, super.config);
     }
-
-    /**
-     * EXPOSED FOR TESTING
-     * @return
-     */
-    @VisibleForTesting
-    protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-        this.updateUserInfoList();
-        return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
-    }
-
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
-        MediaFeedData data = null;
-        while(!this.mediaFeedQueue.isEmpty()) {
-            data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
-            batch.add(new StreamsDatum(data, data.getId()));
-        }
-        this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
-        return new StreamsResultSet(batch);
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !this.isCompleted.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
-        this.isCompleted = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void cleanUp() {
-        this.executorService.shutdown();
-        try {
-            this.executorService.awaitTermination(5, TimeUnit.SECONDS);
-        } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-        } finally {
-            this.executorService = null;
-        }
-    }
-
-    /**
-     * Add default start and stop points if necessary.
-     */
-    private void updateUserInfoList() {
-        UsersInfo usersInfo = this.config.getUsersInfo();
-        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
-            return;
-        }
-        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
-        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
-        for(User user : usersInfo.getUsers()) {
-            if(defaultAfterDate != null && user.getAfterDate() == null) {
-                user.setAfterDate(defaultAfterDate);
-            }
-            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
-                user.setBeforeDate(defaultBeforeDate);
-            }
-        }
-    }
-
-    /**
-     * Overrides the client id in the configuration.
-     * @param clientId client id to use
-     */
-    public void setInstagramClientId(String clientId) {
-        this.config.setClientId(clientId);
-    }
-
-    /**
-     * Overrides authroized user tokens in the configuration.
-     * @param tokenStrings
-     */
-    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
-        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
-    }
-
-    /**
-     * Overrides the default before date in the configuration
-     * @param beforeDate
-     */
-    public void setDefaultBeforeDate(DateTime beforeDate) {
-        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
-    }
-
-    /**
-     * Overrides the default after date in the configuration
-     * @param afterDate
-     */
-    public void setDefaultAfterDate(DateTime afterDate) {
-        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
-    }
-
-    /**
-     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
-     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
-     * NULL DateTimes.
-     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
-     */
-    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
-        Set<User> users = Sets.newHashSet();
-        for(String userId : usersWithAfterDate.keySet()) {
-            User user = new User();
-            user.setUserId(userId);
-            user.setAfterDate(usersWithAfterDate.get(userId));
-            users.add(user);
-        }
-        ensureUsersInfo(this.config).setUsers(users);
-    }
-
-    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
-        UsersInfo usersInfo = config.getUsersInfo();
-        if(usersInfo == null) {
-            usersInfo = new UsersInfo();
-            config.setUsersInfo(usersInfo);
-        }
-        return usersInfo;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
index 0020652..ce31ca7 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
@@ -19,6 +19,7 @@ import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UsersInfo;
@@ -58,7 +59,7 @@ public class InstagramRecentMediaCollectorTest extends RandomizedTest {
     @Repeat(iterations = 3)
     public void testRun() {
         this.expectedDataCount = 0;
-        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+        Queue<StreamsDatum> data = Queues.newConcurrentLinkedQueue();
         InstagramConfiguration config = new InstagramConfiguration();
         UsersInfo usersInfo = new UsersInfo();
         config.setUsersInfo(usersInfo);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
index 4a6396b..371936b 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
@@ -14,10 +14,12 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider.recentmedia;
 
+import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -44,7 +46,7 @@ public class InstagramRecentMediaProviderTest {
     @Test
     public void testStartStream() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
+        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration()) {
 
             private volatile boolean isFinished = false;
 
@@ -62,7 +64,7 @@ public class InstagramRecentMediaProviderTest {
 
         InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
             @Override
-            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+            protected InstagramDataCollector getInstagramDataCollector() {
                 return collectorStub;
             }
         };
@@ -90,10 +92,10 @@ public class InstagramRecentMediaProviderTest {
         final CyclicBarrier test = new CyclicBarrier(2);
         final CyclicBarrier produce = new CyclicBarrier(2);
         final AtomicInteger batchCount = new AtomicInteger(0);
-        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
+        final InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
             @Override
-            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-                return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
+            protected InstagramDataCollector getInstagramDataCollector() {
+                return new InstagramRecentMediaCollector(this.dataQueue, createNonNullConfiguration()) {
 
                     private volatile boolean isFinished = false;
 
@@ -114,7 +116,7 @@ public class InstagramRecentMediaProviderTest {
                         while(randInt != 0) {
                             int batchSize = rand.nextInt(200);
                             for(int i=0; i < batchSize; ++i) {
-                                while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+                                while(!super.dataQueue.add(new StreamsDatum(null))) {
                                     Thread.yield();
                                 }
                             }