You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:10 UTC
[8/8] git commit: Switched everything over to
ComponentUtils.offerUntilSuccess per @mFranklin's request.
Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ae27541e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ae27541e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ae27541e
Branch: refs/heads/master
Commit: ae27541e08674f4db6996e065516b32b8fe0f45d
Parents: d1018e9
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 18:05:09 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 18:05:09 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/s3/S3PersistReaderTask.java | 16 ++--------------
.../java/org/apache/streams/s3/S3PersistWriter.java | 1 -
.../twitter/provider/TwitterTimelineProvider.java | 14 ++++++--------
.../provider/TwitterUserInformationProvider.java | 3 ++-
4 files changed, 10 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 9967216..73763e6 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.s3;
import com.google.common.base.Strings;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +55,7 @@ public class S3PersistReaderTask implements Runnable {
reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
- write( entry );
+ ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
}
@@ -81,17 +82,4 @@ public class S3PersistReaderTask implements Runnable {
LOGGER.error("There was an issue closing file: {}", file);
}
}
-
-
- private void write( StreamsDatum entry ) {
- boolean success;
- do {
- synchronized( S3PersistReader.class ) {
- success = reader.persistQueue.offer(entry);
- }
- Thread.yield();
- }
- while( !success );
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 98671ba..058f748 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -62,7 +62,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
}};
private OutputStreamWriter currentWriter = null;
- protected volatile Queue<StreamsDatum> persistQueue;
public AmazonS3Client getAmazonS3Client() {
return this.amazonS3Client;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2c39cf9..b456fa4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,7 +2,6 @@ package org.apache.streams.twitter.provider;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.typesafe.config.Config;
@@ -11,18 +10,21 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import twitter4j.*;
import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
/**
* Created by sblackmon on 12/10/13.
@@ -105,17 +107,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
//while (keepTrying < 10)
while (keepTrying < 1)
{
-
try
{
statuses = client.getUserTimeline(currentId, paging);
for (Status tStat : statuses) {
String json = TwitterObjectFactory.getRawJSON(tStat);
-
- while(!providerQueue.offer(new StreamsDatum(json))) {
- sleep();
- }
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
}
paging.setPage(paging.getPage() + 1);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 04aa1fe..049c3bb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.util.ComponentUtils;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
for (User tStat : client.lookupUsers(toQuery)) {
String json = DataObjectFactory.getRawJSON(tStat);
- providerQueue.offer(new StreamsDatum(json));
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
}
keepTrying = 10;
}