You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/05/08 17:44:10 UTC

[8/8] git commit: Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.

Switched everything over to ComponentUtils.offerUntilSuccess per @mFranklin's request.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ae27541e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ae27541e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ae27541e

Branch: refs/heads/master
Commit: ae27541e08674f4db6996e065516b32b8fe0f45d
Parents: d1018e9
Author: Matthew Hager <Ma...@gmail.com>
Authored: Mon May 5 18:05:09 2014 -0500
Committer: Matthew Hager <Ma...@gmail.com>
Committed: Mon May 5 18:05:09 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/s3/S3PersistReaderTask.java  | 16 ++--------------
 .../java/org/apache/streams/s3/S3PersistWriter.java |  1 -
 .../twitter/provider/TwitterTimelineProvider.java   | 14 ++++++--------
 .../provider/TwitterUserInformationProvider.java    |  3 ++-
 4 files changed, 10 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
index 9967216..73763e6 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistReaderTask.java
@@ -20,6 +20,7 @@ package org.apache.streams.s3;
 import com.google.common.base.Strings;
 import org.apache.streams.core.DatumStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.util.ComponentUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +55,7 @@ public class S3PersistReaderTask implements Runnable {
                         reader.countersCurrent.incrementAttempt();
                         String[] fields = line.split(Character.toString(reader.DELIMITER));
                         StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
-                        write( entry );
+                        ComponentUtils.offerUntilSuccess(entry, reader.persistQueue);
                         reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                     }
                 }
@@ -81,17 +82,4 @@ public class S3PersistReaderTask implements Runnable {
             LOGGER.error("There was an issue closing file: {}", file);
         }
     }
-
-
-    private void write( StreamsDatum entry ) {
-        boolean success;
-        do {
-            synchronized( S3PersistReader.class ) {
-                success = reader.persistQueue.offer(entry);
-            }
-            Thread.yield();
-        }
-        while( !success );
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
index 98671ba..058f748 100644
--- a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
+++ b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3PersistWriter.java
@@ -62,7 +62,6 @@ public class S3PersistWriter implements StreamsPersistWriter, DatumStatusCountab
     }};
 
     private OutputStreamWriter currentWriter = null;
-    protected volatile Queue<StreamsDatum> persistQueue;
 
     public AmazonS3Client getAmazonS3Client() {
         return this.amazonS3Client;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 2c39cf9..b456fa4 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -2,7 +2,6 @@ package org.apache.streams.twitter.provider;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
@@ -11,18 +10,21 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import sun.reflect.generics.reflectiveObjects.NotImplementedException;
 import twitter4j.*;
 import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
 
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Created by sblackmon on 12/10/13.
@@ -105,17 +107,13 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
             //while (keepTrying < 10)
             while (keepTrying < 1)
             {
-
                 try
                 {
                     statuses = client.getUserTimeline(currentId, paging);
 
                     for (Status tStat : statuses) {
                         String json = TwitterObjectFactory.getRawJSON(tStat);
-
-                        while(!providerQueue.offer(new StreamsDatum(json))) {
-                            sleep();
-                        }
+                        ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
                     }
 
                     paging.setPage(paging.getPage() + 1);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ae27541e/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 04aa1fe..049c3bb 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -26,6 +26,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.util.ComponentUtils;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -122,7 +123,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
                 for (User tStat : client.lookupUsers(toQuery)) {
                     String json = DataObjectFactory.getRawJSON(tStat);
-                    providerQueue.offer(new StreamsDatum(json));
+                    ComponentUtils.offerUntilSuccess(new StreamsDatum(json), providerQueue);
                 }
                 keepTrying = 10;
             }