You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/25 18:42:49 UTC
[2/8] git commit: STREAMS-143 | Refactored
InstagramRecementMediaProvider and InstagramRecentMediaCollector to us new
abstract classes.
STREAMS-143 | Refactored InstagramRecementMediaProvider and InstagramRecentMediaCollector to us new abstract classes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c2a391aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c2a391aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c2a391aa
Branch: refs/heads/master
Commit: c2a391aa7f76557bf24e0c2591c1b2d2431f79b8
Parents: 0df06b6
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 13:18:15 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 13:18:15 2014 -0500
----------------------------------------------------------------------
.../provider/InstagramAbstractProvider.java | 4 +-
.../provider/InstagramDataCollector.java | 24 ++-
.../InstagramRecentMediaCollector.java | 83 ++--------
.../InstagramRecentMediaProvider.java | 153 +------------------
.../InstagramRecentMediaCollectorTest.java | 3 +-
.../InstagramRecentMediaProviderTest.java | 14 +-
6 files changed, 48 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index 3d35714..c34014c 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -43,7 +43,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
- private InstagramConfiguration config;
+ protected InstagramConfiguration config;
private InstagramDataCollector dataCollector;
protected Queue<StreamsDatum> dataQueue; //exposed for testing
private ExecutorService executorService;
@@ -54,7 +54,7 @@ public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
}
public InstagramAbstractProvider(InstagramConfiguration config) {
- this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+ this.config = SerializationUtil.cloneBySerialization(config);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
index 4cfc282..2e3add6 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramDataCollector.java
@@ -82,6 +82,19 @@ public abstract class InstagramDataCollector<T> implements Runnable {
return this.instagram;
}
+ /**
+ * Return the number of available tokens for this data collector
+ * @return numbeer of available tokens
+ */
+ protected int numAvailableTokens() {
+ return this.tokenManger.numAvailableTokens();
+ }
+
+ /**
+ * Queues the Instagram data to be output by the provider.
+ * @param userData data to queue
+ * @param userId user id who the data came from
+ */
protected void queueData(Collection<T> userData, String userId) {
if (userData == null) {
LOGGER.warn("User id, {}, returned a NULL data from instagram.", userId);
@@ -101,12 +114,13 @@ public abstract class InstagramDataCollector<T> implements Runnable {
@Override
public void run() {
- try {
- for (User user : this.config.getUsersInfo().getUsers()) {
+ for (User user : this.config.getUsersInfo().getUsers()) {
+ try {
collectInstagramDataForUser(user);
+ } catch (Exception e) {
+ LOGGER.error("Exception thrown while polling for user, {}, skipping user.", user.getUserId());
+ LOGGER.error("Exception thrown while polling for user : ", e);
}
- } catch (Exception e) {
- LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
}
this.isCompleted.set(true);
}
@@ -116,7 +130,7 @@ public abstract class InstagramDataCollector<T> implements Runnable {
* @param user
* @throws Exception
*/
- protected abstract void collectInstagramDataForUser(User user);
+ protected abstract void collectInstagramDataForUser(User user) throws Exception;
/**
* Takes an Instagram Object and sets it as the document of a streams datum and sets the id of the streams datum.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
index 952e3bf..6bf71c3 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -15,15 +15,10 @@ under the License. */
package org.apache.streams.instagram.provider.recentmedia;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.instagram.InstagramConfiguration;
import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.provider.InstagramOauthToken;
-import org.apache.streams.util.ComponentUtils;
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
-import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
-import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
-import org.jinstagram.Instagram;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
import org.jinstagram.entity.common.Pagination;
import org.jinstagram.entity.users.feed.MediaFeed;
import org.jinstagram.entity.users.feed.MediaFeedData;
@@ -33,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
/**
* Executes on all of the Instagram requests to collect the media feed data.
@@ -41,76 +35,21 @@ import java.util.concurrent.atomic.AtomicBoolean;
* If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
* move on to the next user. If a rate limit exception occurs it employs an exponential back off strategy.
*/
-public class InstagramRecentMediaCollector implements Runnable {
+public class InstagramRecentMediaCollector extends InstagramDataCollector<MediaFeedData> {
private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
protected static final int MAX_ATTEMPTS = 5;
- protected static final int SLEEP_SECS = 5; //5 seconds
- protected Queue<MediaFeedData> dataQueue; //exposed for testing
- private InstagramConfiguration config;
- private AtomicBoolean isCompleted;
- private SimpleTokenManager<InstagramOauthToken> tokenManger;
private int consecutiveErrorCount;
- private BackOffStrategy backOffStrategy;
- private Instagram instagram;
- public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
- this.dataQueue = queue;
- this.config = config;
- this.isCompleted = new AtomicBoolean(false);
- this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
- for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
- this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
- }
- this.consecutiveErrorCount = 0;
- this.backOffStrategy = new ExponentialBackOffStrategy(2);
- this.instagram = new Instagram(this.config.getClientId());
- }
-
-
- /**
- * If there are authorized tokens available, it sets a new token for the client and returns
- * the client. If there are no available tokens, it simply returns the client that was
- * initialized in the constructor with client id.
- * @return
- */
- @VisibleForTesting
- protected Instagram getNextInstagramClient() {
- if(this.tokenManger.numAvailableTokens() > 0) {
- this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
- }
- return this.instagram;
- }
-
- private void queueData(MediaFeed userFeed, String userId) {
- if (userFeed == null) {
- LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
- } else {
- for (MediaFeedData data : userFeed.getData()) {
- ComponentUtils.offerUntilSuccess(data, this.dataQueue);
- }
- }
- }
-
- /**
- * @return true when the collector has queued all of the available media feed data for the provided users.
- */
- public boolean isCompleted() {
- return this.isCompleted.get();
+ public InstagramRecentMediaCollector(Queue<StreamsDatum> queue, InstagramConfiguration config) {
+ super(queue, config);
}
@Override
- public void run() {
- try {
- for (User user : this.config.getUsersInfo().getUsers()) {
- collectMediaFeed(user);
- }
- } catch (Exception e) {
- LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
- }
- this.isCompleted.set(true);
+ protected StreamsDatum convertToStreamsDatum(MediaFeedData item) {
+ return new StreamsDatum(item, item.getId());
}
/**
@@ -119,8 +58,8 @@ public class InstagramRecentMediaCollector implements Runnable {
* @param user
* @throws Exception
*/
- @VisibleForTesting
- protected void collectMediaFeed(User user) throws Exception {
+ @Override
+ protected void collectInstagramDataForUser(User user) throws Exception {
Pagination pagination = null;
do {
int attempts = 0;
@@ -151,7 +90,7 @@ public class InstagramRecentMediaCollector implements Runnable {
LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
++this.consecutiveErrorCount;
}
- if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
+ if(this.consecutiveErrorCount > Math.max(this.numAvailableTokens(), MAX_ATTEMPTS*2)) {
throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
}
}
@@ -159,7 +98,7 @@ public class InstagramRecentMediaCollector implements Runnable {
this.consecutiveErrorCount = 0;
this.backOffStrategy.reset();
pagination = feed.getPagination();
- queueData(feed, user.getUserId());
+ queueData(feed.getData(), user.getUserId());
}
}
if(!succesfullDataPull) {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
index 39acbd4..ad039a8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -22,6 +22,8 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.instagram.*;
+import org.apache.streams.instagram.provider.InstagramAbstractProvider;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.SerializationUtil;
@@ -41,161 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
*/
-public class InstagramRecentMediaProvider implements StreamsProvider {
+public class InstagramRecentMediaProvider extends InstagramAbstractProvider {
- private InstagramConfiguration config;
- private InstagramRecentMediaCollector dataCollector;
- protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
- private ExecutorService executorService;
- private AtomicBoolean isCompleted;
public InstagramRecentMediaProvider() {
- this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
}
public InstagramRecentMediaProvider(InstagramConfiguration config) {
- this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
+ super(config);
}
@Override
- public void startStream() {
- this.dataCollector = getInstagramRecentMediaCollector();
- this.executorService = Executors.newSingleThreadExecutor();
- this.executorService.submit(this.dataCollector);
+ protected InstagramDataCollector getInstagramDataCollector() {
+ return new InstagramRecentMediaCollector(super.dataQueue, super.config);
}
-
- /**
- * EXPOSED FOR TESTING
- * @return
- */
- @VisibleForTesting
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
- this.updateUserInfoList();
- return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
- }
-
-
- @Override
- public StreamsResultSet readCurrent() {
- Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
- MediaFeedData data = null;
- while(!this.mediaFeedQueue.isEmpty()) {
- data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
- batch.add(new StreamsDatum(data, data.getId()));
- }
- this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
- return new StreamsResultSet(batch);
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return !this.isCompleted.get();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
- this.isCompleted = new AtomicBoolean(false);
- }
-
- @Override
- public void cleanUp() {
- this.executorService.shutdown();
- try {
- this.executorService.awaitTermination(5, TimeUnit.SECONDS);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } finally {
- this.executorService = null;
- }
- }
-
- /**
- * Add default start and stop points if necessary.
- */
- private void updateUserInfoList() {
- UsersInfo usersInfo = this.config.getUsersInfo();
- if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
- return;
- }
- DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
- DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
- for(User user : usersInfo.getUsers()) {
- if(defaultAfterDate != null && user.getAfterDate() == null) {
- user.setAfterDate(defaultAfterDate);
- }
- if(defaultBeforeDate != null && user.getBeforeDate() == null) {
- user.setBeforeDate(defaultBeforeDate);
- }
- }
- }
-
- /**
- * Overrides the client id in the configuration.
- * @param clientId client id to use
- */
- public void setInstagramClientId(String clientId) {
- this.config.setClientId(clientId);
- }
-
- /**
- * Overrides authroized user tokens in the configuration.
- * @param tokenStrings
- */
- public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
- ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
- }
-
- /**
- * Overrides the default before date in the configuration
- * @param beforeDate
- */
- public void setDefaultBeforeDate(DateTime beforeDate) {
- ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
- }
-
- /**
- * Overrides the default after date in the configuration
- * @param afterDate
- */
- public void setDefaultAfterDate(DateTime afterDate) {
- ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
- }
-
- /**
- * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
- * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all
- * NULL DateTimes.
- * @param usersWithAfterDate instagram user id mapped to BeforeDate time
- */
- public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
- Set<User> users = Sets.newHashSet();
- for(String userId : usersWithAfterDate.keySet()) {
- User user = new User();
- user.setUserId(userId);
- user.setAfterDate(usersWithAfterDate.get(userId));
- users.add(user);
- }
- ensureUsersInfo(this.config).setUsers(users);
- }
-
- private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
- UsersInfo usersInfo = config.getUsersInfo();
- if(usersInfo == null) {
- usersInfo = new UsersInfo();
- config.setUsersInfo(usersInfo);
- }
- return usersInfo;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
index 0020652..ce31ca7 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
@@ -19,6 +19,7 @@ import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.instagram.InstagramConfiguration;
import org.apache.streams.instagram.User;
import org.apache.streams.instagram.UsersInfo;
@@ -58,7 +59,7 @@ public class InstagramRecentMediaCollectorTest extends RandomizedTest {
@Repeat(iterations = 3)
public void testRun() {
this.expectedDataCount = 0;
- Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+ Queue<StreamsDatum> data = Queues.newConcurrentLinkedQueue();
InstagramConfiguration config = new InstagramConfiguration();
UsersInfo usersInfo = new UsersInfo();
config.setUsersInfo(usersInfo);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c2a391aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
index 4a6396b..371936b 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
@@ -14,10 +14,12 @@ specific language governing permissions and limitations
under the License. */
package org.apache.streams.instagram.provider.recentmedia;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.instagram.InstagramConfiguration;
import org.apache.streams.instagram.User;
import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.junit.Test;
import org.slf4j.Logger;
@@ -44,7 +46,7 @@ public class InstagramRecentMediaProviderTest {
@Test
public void testStartStream() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
- final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
+ final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration()) {
private volatile boolean isFinished = false;
@@ -62,7 +64,7 @@ public class InstagramRecentMediaProviderTest {
InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
@Override
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ protected InstagramDataCollector getInstagramDataCollector() {
return collectorStub;
}
};
@@ -90,10 +92,10 @@ public class InstagramRecentMediaProviderTest {
final CyclicBarrier test = new CyclicBarrier(2);
final CyclicBarrier produce = new CyclicBarrier(2);
final AtomicInteger batchCount = new AtomicInteger(0);
- InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
+ final InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
@Override
- protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
- return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
+ protected InstagramDataCollector getInstagramDataCollector() {
+ return new InstagramRecentMediaCollector(this.dataQueue, createNonNullConfiguration()) {
private volatile boolean isFinished = false;
@@ -114,7 +116,7 @@ public class InstagramRecentMediaProviderTest {
while(randInt != 0) {
int batchSize = rand.nextInt(200);
for(int i=0; i < batchSize; ++i) {
- while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+ while(!super.dataQueue.add(new StreamsDatum(null))) {
Thread.yield();
}
}