You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/18 18:10:19 UTC

[1/8] git commit: Merge pull request #1 from apache/master

Repository: incubator-streams
Updated Branches:
  refs/heads/master 297728774 -> c33f66544


Merge pull request #1 from apache/master

pullling in apache master

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b22efee7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b22efee7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b22efee7

Branch: refs/heads/master
Commit: b22efee7433fd7d82079fcb1306202779e2acb43
Parents: aa376fd cbfe01a
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Thu Jul 31 13:54:35 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Thu Jul 31 13:54:35 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |    2 +-
 streams-contrib/pom.xml                         |    2 +
 .../org/apache/streams/s3/S3PersistReader.java  |    9 +-
 .../streams/console/ConsolePersistReader.java   |   23 +
 .../streams/console/ConsolePersistWriter.java   |   18 +
 .../streams-persist-elasticsearch/README.md     |   38 +
 .../elasticsearch/ElasticsearchClient.java      |   20 +-
 .../ElasticsearchClientManager.java             |   26 +-
 .../ElasticsearchConfigurator.java              |   36 +
 .../ElasticsearchPersistReader.java             |   30 +-
 .../ElasticsearchPersistUpdater.java            |   53 +-
 .../ElasticsearchPersistWriter.java             |  686 ++-
 .../ElasticsearchPersistWriterTask.java         |   38 -
 .../elasticsearch/ElasticsearchQuery.java       |   57 +-
 .../elasticsearch/PercolateProcessor.java       |   20 +-
 .../ElasticsearchReaderConfiguration.json       |    5 +
 .../ElasticsearchWriterConfiguration.json       |    5 +
 .../src/main/resources/reference.json           |    9 +
 .../apache/streams/hbase/HbaseConfigurator.java |   18 +
 .../streams/hbase/HbasePersistWriter.java       |   18 +
 .../streams/hbase/HbasePersistWriterTask.java   |   18 +
 .../apache/streams/hdfs/HdfsConfigurator.java   |   18 +
 .../streams/hdfs/WebHdfsPersistReader.java      |   27 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   21 +-
 .../streams/hdfs/WebHdfsPersistWriter.java      |  162 +-
 .../streams/hdfs/WebHdfsPersistWriterTask.java  |   18 +
 .../apache/streams/kafka/KafkaConfigurator.java |   18 +
 .../streams/kafka/KafkaPersistReader.java       |   23 +
 .../streams/kafka/KafkaPersistReaderTask.java   |   18 +
 .../streams/kafka/KafkaPersistWriter.java       |   18 +
 .../streams/kafka/KafkaPersistWriterTask.java   |   18 +
 .../streams/kafka/StreamsPartitioner.java       |   18 +
 streams-contrib/streams-persist-mongo/README.md |   16 +
 .../apache/streams/mongo/MongoConfigurator.java |   18 +
 .../streams/mongo/MongoPersistReader.java       |  270 ++
 .../streams/mongo/MongoPersistWriter.java       |   39 +-
 .../src/main/resources/reference.json           |    8 +
 .../src/main/resources/reference.properties     |   10 -
 streams-contrib/streams-processor-json/pom.xml  |   86 +
 .../apache/streams/json/JsonPathExtractor.java  |  150 +
 .../org/apache/streams/json/JsonPathFilter.java |  171 +
 .../json/test/JsonPathExtractorTest.java        |  101 +
 .../json/test/JsonPathExtractorTwitterTest.java |   92 +
 .../src/test/resources/books.json               |   21 +
 .../src/test/resources/tweet.json               |    1 +
 .../regex/AbstractRegexExtensionExtractor.java  |   23 +-
 .../org/apache/streams/regex/RegexUtils.java    |   43 +-
 .../org/apache/streams/urls/LinkResolver.java   |    3 +-
 .../urls/LinkResolverHelperFunctions.java       |    1 +
 .../streams/urls/LinkResolverProcessor.java     |    3 +-
 .../streams/urls/LinkHelperFunctionsTest.java   |   18 +
 .../streams/urls/TestLinkUnwinderProcessor.java |   18 +
 .../streams-provider-datasift/pom.xml           |   50 +-
 .../streams/datasift/csdl/DatasiftCsdlUtil.java |  114 +
 .../datasift/provider/DatasiftConverter.java    |   38 +
 .../provider/DatasiftEventProcessor.java        |  105 -
 .../provider/DatasiftStreamConfigurator.java    |   20 +-
 .../provider/DatasiftStreamProvider.java        |  277 +-
 .../DatasiftTypeConverterProcessor.java         |  166 +
 .../streams/datasift/provider/ErrorHandler.java |   46 +
 .../streams/datasift/provider/Subscription.java |   59 +
 .../serializer/DatasiftActivitySerializer.java  |  193 +-
 .../DatasiftDefaultActivitySerializer.java      |  214 +
 .../DatasiftTweetActivitySerializer.java        |  258 ++
 .../main/jsonschema/com/datasift/Datasift.json  |  221 +-
 .../com/datasift/DatasiftConfiguration.json     |    1 +
 .../com/datasift/DatasiftTwitterUser.json       |   61 +
 .../com/datasift/test/DatasiftSerDeTest.java    |   63 -
 .../com/datasift/test/DatasiftSerDeTest.java    |   80 +
 .../provider/DatasiftStreamProviderTest.java    |  144 +
 .../DatasiftTypeConverterProcessorTest.java     |   73 +
 .../datasift/provider/ErrorHandlerTest.java     |   40 +
 .../datasift/provider/SubscriptionTest.java     |   58 +
 .../DatasiftActivitySerializerTest.java         |   66 +
 .../src/test/resources/amazon_datasift_json.txt |   10 +
 .../src/test/resources/blog_datasift_json.txt   |  719 +++
 .../src/test/resources/board_datasift_json.txt  | 4160 ++++++++++++++++++
 .../test/resources/facebook_datasift_json.txt   | 1843 ++++++++
 .../src/test/resources/part-r-00000.json        | 1724 ++++----
 .../resources/rand_sample_datasift_json.txt     | 1547 +++++++
 .../resources/random_sample_datasift_json.txt   | 1547 +++++++
 .../src/test/resources/reddit_datasift_json.txt |   33 +
 .../test/resources/twitter_datasift_json.txt    | 1000 +++++
 .../test/resources/wikipedia_datasift_json.txt  |  252 ++
 .../test/resources/youtube_datasift_json.txt    |    7 +
 ...FacebookPublicFeedXmlActivitySerializer.java |   18 +
 .../FacebookPostActivitySerializerTest.java     |    1 +
 .../facebook/test/FacebookPostSerDeTest.java    |   18 +
 .../facebook/test/FacebookEDCSerDeTest.java     |   18 +
 .../gnip/flickr/test/FlickrEDCSerDeTest.java    |   18 +
 .../com/gplus/api/GPlusActivitySerializer.java  |   18 +
 .../com/gplus/api/GPlusEDCAsActivityTest.java   |   18 +
 .../com/instagram/test/InstagramSerDeTest.java  |   18 +
 .../reddit/api/RedditActivitySerializer.java    |   18 +
 .../reddit/api/RedditEDCAsActivityJSONTest.java |   18 +
 .../java/com/gnip/test/YouTubeEDCSerDeTest.java |   20 +-
 .../com/gnip/test/YoutubeEDCAsActivityTest.java |    2 +-
 .../ActivityXMLActivitySerializer.java          |   18 +
 .../PowerTrackActivitySerializer.java           |   18 +
 .../test/PowerTrackDeserializationTest.java     |   18 +
 .../google-gmail/pom.xml                        |    2 +-
 .../com/google/gmail/GMailConfigurator.java     |   18 +
 .../gmail/provider/GMailImapProviderTask.java   |   18 +
 .../GMailMessageActivitySerializer.java         |   18 +
 .../google/gmail/provider/GMailProvider.java    |   26 +-
 .../gmail/provider/GMailRssProviderTask.java    |   18 +
 .../gplus/provider/GPlusActivitySerializer.java |   18 +
 .../gplus/provider/GPlusConfigurator.java       |   18 +
 .../gplus/provider/GPlusEventProcessor.java     |   18 +
 .../provider/GPlusHistoryProviderTask.java      |   18 +
 .../google/gplus/provider/GPlusProvider.java    |   23 +
 .../gmail/test/GMailMessageSerDeTest.java       |   18 +
 .../streams-provider-instagram/README.md        |   17 +
 .../InstagramMediaDataActivitySerializer.mup    |  894 ++++
 .../metadata/instagram_to_activity_mapping.png  |  Bin 0 -> 1123684 bytes
 .../streams-provider-instagram/pom.xml          |  145 +
 .../instagram/InstagramConfigurator.java        |   65 +
 .../processor/InstagramTypeConverter.java       |  102 +
 .../provider/InstagramRecentMediaCollector.java |  165 +
 .../provider/InstagramRecentMediaProvider.java  |  115 +
 .../InstagramJsonActivitySerializer.java        |   78 +
 .../serializer/util/InstagramActivityUtil.java  |  285 ++
 .../serializer/util/InstagramDeserializer.java  |   33 +
 .../com/instagram/InstagramConfiguration.json   |   21 +
 .../InstagramUserInformationConfiguration.json  |   17 +
 .../src/main/resources/reference.conf           |    5 +
 .../InstagramRecentMediaCollectorTest.java      |  175 +
 .../InstagramRecentMediaProviderTest.java       |  160 +
 .../test/InstagramActivitySerDeTest.java        |   88 +
 .../src/test/resources/testMediaFeedObjects.txt |    2 +
 .../src/test/resources/testtweets.txt           |  695 +++
 .../data/MoreoverJsonActivitySerializer.java    |   18 +
 .../data/MoreoverXmlActivitySerializer.java     |   18 +
 .../streams/data/moreover/MoreoverClient.java   |   18 +
 .../data/moreover/MoreoverConfigurator.java     |   18 +
 .../streams/data/moreover/MoreoverProvider.java |   23 +
 .../data/moreover/MoreoverProviderTask.java     |   18 +
 .../streams/data/moreover/MoreoverResult.java   |   18 +
 .../data/moreover/MoreoverResultSetWrapper.java |   18 +
 .../apache/streams/data/util/MoreoverUtils.java |   18 +
 .../MoreoverJsonActivitySerializerTest.java     |    1 +
 .../data/MoreoverXmlActivitySerializerTest.java |   18 +
 .../streams/rss/processor/RssTypeConverter.java |   72 +
 .../rss/provider/RssEventClassifier.java        |   18 +
 .../streams/rss/provider/RssEventProcessor.java |   26 +-
 .../rss/provider/RssStreamConfigurator.java     |   18 +
 .../streams/rss/provider/RssStreamProvider.java |   24 +
 .../rss/provider/RssStreamProviderTask.java     |   18 +
 .../serializer/SyndEntryActivitySerializer.java |  216 +-
 .../rss/serializer/SyndEntrySerializer.java     |  308 ++
 .../streams/rss/test/RssTypeConverterTest.java  |   31 +
 .../test/SyndEntryActivitySerizlizerTest.java   |  102 +
 .../streams/rss/test/Top100FeedsTest.java       |   66 -
 .../src/test/resources/TestSyndEntryJson.txt    |   10 +
 .../sysomos/config/SysomosConfigurator.java     |   39 +
 .../sysomos/proessor/SysomosTypeConverter.java  |   56 -
 .../sysomos/processor/SysomosTypeConverter.java |   56 +
 .../sysomos/provider/ContentRequestBuilder.java |    1 +
 .../provider/SysomosHeartbeatStream.java        |  149 +-
 .../sysomos/provider/SysomosProvider.java       |  127 +-
 .../streams/sysomos/util/SysomosUtils.java      |    2 +-
 .../com/sysomos/test/SysomosJsonSerDeTest.java  |   18 +
 .../com/sysomos/test/SysomosXmlSerDeTest.java   |   18 +
 .../streams-provider-twitter/pom.xml            |    5 +-
 .../FetchAndReplaceTwitterProcessor.java        |  173 +
 .../processor/TwitterEventProcessor.java        |   18 +
 .../processor/TwitterProfileProcessor.java      |   56 +-
 .../twitter/processor/TwitterTypeConverter.java |   35 +-
 .../twitter/provider/TwitterErrorHandler.java   |   20 +-
 .../provider/TwitterEventClassifier.java        |   24 +-
 .../provider/TwitterStreamConfigurator.java     |   18 +
 .../twitter/provider/TwitterStreamProvider.java |   23 +
 .../provider/TwitterStreamProviderTask.java     |   18 +
 .../provider/TwitterTimelineProvider.java       |   79 +-
 .../provider/TwitterTimelineProviderTask.java   |   18 +
 .../TwitterUserInformationProvider.java         |   12 +-
 .../serializer/StreamsTwitterMapper.java        |   30 +
 .../TwitterJsonActivitySerializer.java          |   68 +-
 .../TwitterJsonDeleteActivitySerializer.java    |   43 +-
 .../TwitterJsonRetweetActivitySerializer.java   |  108 +-
 .../TwitterJsonTweetActivitySerializer.java     |  108 +-
 .../TwitterJsonUserActivitySerializer.java      |   72 +
 ...erJsonUserstreameventActivitySerializer.java |   25 +-
 .../serializer/util/TwitterActivityUtil.java    |  335 ++
 .../src/main/jsonschema/com/twitter/tweet.json  |   42 +-
 .../streams/twitter/test/SimpleTweetTest.java   |   18 +
 .../twitter/test/TweetActivitySerDeTest.java    |   19 +-
 .../streams/twitter/test/TweetSerDeTest.java    |   18 +
 .../test/TwitterEventClassifierTest.java        |   27 +
 .../twitter/test/TwitterStreamProviderTest.java |   18 +
 .../org/apache/streams/core/DatumStatus.java    |   18 +
 .../streams/core/DatumStatusCountable.java      |   18 +
 .../apache/streams/core/DatumStatusCounter.java |   18 +
 .../org/apache/streams/core/StreamBuilder.java  |   18 +
 .../org/apache/streams/core/StreamHandler.java  |   18 +
 .../org/apache/streams/core/StreamState.java    |   18 +
 .../org/apache/streams/core/StreamsDatum.java   |    4 +
 .../apache/streams/core/StreamsOperation.java   |   18 +
 .../apache/streams/core/StreamsProvider.java    |   38 +-
 .../apache/streams/data/util/RFC3339Utils.java  |    5 +
 .../ActivityDeserializerException.java          |   18 +
 .../exceptions/ActivitySerializerException.java |   18 +
 .../jackson/StreamsDateTimeDeserializer.java    |   20 +-
 .../jackson/StreamsDateTimeSerializer.java      |   18 +
 .../streams/jackson/StreamsJacksonMapper.java   |   22 +-
 .../streams/jackson/StreamsJacksonModule.java   |   18 +
 .../jackson/StreamsPeriodDeserializer.java      |   18 +
 .../jackson/StreamsPeriodSerializer.java        |   20 +-
 .../data/data/util/DateTimeSerDeTest.java       |   18 +
 .../local/builders/InvalidStreamException.java  |   18 +
 .../local/builders/LocalStreamBuilder.java      |   61 +-
 .../streams/local/builders/StreamComponent.java |   20 +-
 .../streams/local/tasks/BaseStreamsTask.java    |   18 +
 .../tasks/LocalStreamProcessMonitorThread.java  |   18 +
 .../tasks/StatusCounterMonitorRunnable.java     |   18 +
 .../local/tasks/StatusCounterMonitorThread.java |   37 +-
 .../streams/local/tasks/StreamsMergeTask.java   |   18 +
 .../local/tasks/StreamsPersistWriterTask.java   |   18 +
 .../local/tasks/StreamsProcessorTask.java       |   53 +-
 .../local/tasks/StreamsProviderTask.java        |   75 +-
 .../apache/streams/local/tasks/StreamsTask.java |   28 +-
 .../local/builders/LocalStreamBuilderTest.java  |   89 +-
 .../local/builders/ToyLocalBuilderExample.java  |   18 +
 .../streams/local/tasks/BasicTasksTest.java     |   18 +
 .../test/processors/DoNothingProcessor.java     |   18 +
 .../PassthroughDatumCounterProcessor.java       |   18 +
 .../local/test/processors/SlowProcessor.java    |   50 +
 .../test/providers/EmptyResultSetProvider.java  |   68 +
 .../test/providers/NumericMessageProvider.java  |   86 +-
 .../local/test/writer/DatumCounterWriter.java   |   18 +
 .../local/test/writer/DoNothingWriter.java      |   18 +
 .../local/test/writer/SystemOutWriter.java      |   18 +
 .../component/ExpectedDatumsPersistWriter.java  |   18 +
 .../test/component/FileReaderProvider.java      |   73 +-
 .../test/component/StreamsDatumConverter.java   |   18 +
 .../component/StringToDocumentConverter.java    |   18 +
 .../tests/TestComponentsLocalStream.java        |   18 +
 .../tests/TestExpectedDatumsPersitWriter.java   |   18 +
 .../component/tests/TestFileReaderProvider.java |   18 +
 streams-runtimes/streams-runtime-pig/pom.xml    |   49 +-
 .../streams/pig/StreamsComponentFactory.java    |   29 +-
 .../apache/streams/pig/StreamsPigBuilder.java   |   24 +-
 .../streams/pig/StreamsProcessDatumExec.java    |  132 +-
 .../streams/pig/StreamsProcessDocumentExec.java |   76 +-
 .../streams/pig/StreamsSerializerExec.java      |   44 +-
 .../org/apache/streams/pig/StreamsStorage.java  |   22 +-
 .../src/test/java/PigProcessorTest.java         |   32 -
 .../src/test/java/PigSerializerTest.java        |   40 -
 .../streams/pig/test/AppendStringProcessor.java |   59 +
 .../streams/pig/test/CopyThriceProcessor.java   |   59 +
 .../streams/pig/test/DoNothingProcessor.java    |   57 +
 .../streams/pig/test/PigProcessDatumTest.java   |   99 +
 .../pig/test/PigProcessDocumentTest.java        |  117 +
 .../streams/pig/test/PigSerializerTest.java     |   62 +
 .../test/resources/pigprocessdatumcopytest.pig  |    4 +
 .../src/test/resources/pigprocessdatumtest.pig  |    4 +
 .../resources/pigprocessdocumentappendtest.pig  |    4 +
 .../test/resources/pigprocessdocumenttest.pig   |    4 +
 .../src/test/resources/pigprocessortest.pig     |    7 -
 .../src/test/resources/pigserializertest.pig    |   12 +-
 .../trident/StreamsPersistWriterState.java      |   18 +
 .../storm/trident/StreamsProcessorFunction.java |   18 +
 .../storm/trident/StreamsProviderSpout.java     |   20 +-
 .../org/apache/streams/util/ComponentUtils.java |   18 +
 .../java/org/apache/streams/util/DateUtil.java  |   18 +
 .../apache/streams/util/SerializationUtil.java  |   18 +
 266 files changed, 23732 insertions(+), 2915 deletions(-)
----------------------------------------------------------------------



[7/8] git commit: STREAMS-143 | Code review feedback changes

Posted by mf...@apache.org.
STREAMS-143 | Code review feedback changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/a01b6911
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/a01b6911
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/a01b6911

Branch: refs/heads/master
Commit: a01b691130f7afb22f0c16b6f862f8ba323f5b69
Parents: 557e54e
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 13 16:24:23 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 13 16:24:23 2014 -0500

----------------------------------------------------------------------
 .../instagram/provider/InstagramOauthToken.java | 14 ++++
 .../provider/InstagramRecentMediaCollector.java | 16 +---
 .../provider/InstagramRecentMediaProvider.java  |  9 +--
 .../com/instagram/InstagramConfiguration.json   |  2 +
 .../InstagramRecentMediaCollectorTest.java      | 36 ++-------
 .../InstagramRecentMediaProviderTest.java       | 21 +++++-
 .../org/apache/streams/util/ComponentUtils.java | 41 ++++++++--
 .../util/ConcurentYieldTillSuccessQueue.java    | 27 -------
 .../backoff/AbstractBackOffStrategy.java        | 78 ++++++++++++++++++++
 .../api/requests/backoff/BackOffException.java  | 18 ++++-
 .../api/requests/backoff/BackOffStrategy.java   | 73 +++++-------------
 .../impl/ConstantTimeBackOffStrategy.java       | 20 ++++-
 .../impl/ExponentialBackOffStrategy.java        | 18 ++++-
 .../backoff/impl/LinearTimeBackOffStrategy.java | 20 ++++-
 .../util/oauth/tokens/AbstractOauthToken.java   | 14 ++++
 .../tokens/tokenmanager/SimpleTokenManager.java | 14 ++++
 .../tokenmanager/impl/BasicTokenManger.java     | 14 ++++
 .../requests/backoff/BackOffStrategyTest.java   |  8 +-
 .../ConstantTimeBackOffStrategyTest.java        |  2 +-
 .../backoff/ExponentialBackOffStrategyTest.java |  2 +-
 .../backoff/LinearTimeBackOffStartegyTest.java  |  2 +-
 21 files changed, 294 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index d41fc64..d395607 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.instagram.provider;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 08b1696..8dc7a82 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -17,8 +17,7 @@ package org.apache.streams.instagram.provider;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UserId;
-import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
 import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
 import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
@@ -86,10 +85,10 @@ public class InstagramRecentMediaCollector implements Runnable {
 
     private void queueData(MediaFeed userFeed, String userId) {
         if (userFeed == null) {
-            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
+            LOGGER.warn("User id, {}, returned a NULL media feed from instagram.", userId);
         } else {
             for (MediaFeedData data : userFeed.getData()) {
-                this.dataQueue.offer(data);
+                ComponentUtils.offerUntilSuccess(data, this.dataQueue);
             }
         }
     }
@@ -168,14 +167,5 @@ public class InstagramRecentMediaCollector implements Runnable {
         } while (pagination != null && pagination.hasNextPage());
     }
 
-    /**
-     * Handles different types of {@link java.lang.Exception} caught while trying to pull Instagram data.
-     * BackOffs/Sleeps when it encounters a rate limit expection..
-     * @param e
-     * @throws BackOffException
-     */
-    protected void handleException(Exception e) throws BackOffException {
-
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index 6f975d0..874fd19 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -22,6 +22,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.*;
+import org.apache.streams.util.ComponentUtils;
 import org.apache.streams.util.SerializationUtil;
 import org.jinstagram.auth.model.Token;
 import org.jinstagram.entity.users.feed.MediaFeedData;
@@ -78,11 +79,9 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
     public StreamsResultSet readCurrent() {
         Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
         MediaFeedData data = null;
-        synchronized (this.mediaFeedQueue) {
-            while(!this.mediaFeedQueue.isEmpty()) {
-                data = this.mediaFeedQueue.poll();
-                batch.add(new StreamsDatum(data, data.getId()));
-            }
+        while(!this.mediaFeedQueue.isEmpty()) {
+            data = ComponentUtils.pollWhileNotEmpty(this.mediaFeedQueue);
+            batch.add(new StreamsDatum(data, data.getId()));
         }
         this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
         return new StreamsResultSet(batch);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index 431efbc..bcb5fd7 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -11,6 +11,7 @@
         },
         "usersInfo": {
             "type": "object",
+            "javaInterfaces" : ["java.io.Serializable"],
             "properties": {
                 "authorizedTokens": {
                     "type": "array",
@@ -45,6 +46,7 @@
     "definitions": {
         "user": {
             "type": "object",
+            "javaInterfaces" : ["java.io.Serializable"],
             "properties": {
                 "userId": {
                     "type": "string",

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
index 74b5139..1d234c9 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -22,9 +22,8 @@ import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
 import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.instagram.UserId;
+import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UsersInfo;
-import org.apache.streams.util.ConcurentYieldTillSuccessQueue;
 import org.jinstagram.Instagram;
 import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
@@ -56,38 +55,19 @@ public class InstagramRecentMediaCollectorTest extends RandomizedTest {
 
     private int expectedDataCount = 0;
 
-    @Test
-    public void testHandleInstagramException1() throws InstagramException {
-        InstagramException ie = mock(InstagramRateLimitException.class);
-        when(ie.getRemainingLimitStatus()).thenReturn(1);
-        final String message = "Test Message";
-        when(ie.getMessage()).thenReturn(message);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurentYieldTillSuccessQueue<MediaFeedData>(), new InstagramConfiguration());
-        try {
-            long startTime = System.currentTimeMillis();
-            collector.handleException(ie);
-            long endTime = System.currentTimeMillis();
-            assertTrue(2000 <= endTime - startTime);
-            startTime = System.currentTimeMillis();
-            collector.handleException(ie);
-            endTime = System.currentTimeMillis();
-            assertTrue(4000 <= endTime - startTime);
-        } catch (Exception e) {
-            fail("Should not have thrown an exception.");
-        }
-    }
+
 
 
     @Test
     @Repeat(iterations = 3)
     public void testRun() {
         this.expectedDataCount = 0;
-        Queue<MediaFeedData> data = new ConcurentYieldTillSuccessQueue<MediaFeedData>();
+        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
         InstagramConfiguration config = new InstagramConfiguration();
         UsersInfo usersInfo = new UsersInfo();
         config.setUsersInfo(usersInfo);
-        Set<UserId> users = creatUsers(randomIntBetween(0, 100));
-        usersInfo.setUserIds(users);
+        Set<User> users = creatUsers(randomIntBetween(0, 100));
+        usersInfo.setUsers(users);
 
         final Instagram mockInstagram = createMockInstagramClient();
         InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
@@ -138,10 +118,10 @@ public class InstagramRecentMediaCollectorTest extends RandomizedTest {
         return instagramClient;
     }
 
-    private Set<UserId> creatUsers(int numUsers) {
-        Set<UserId> users = Sets.newHashSet();
+    private Set<User> creatUsers(int numUsers) {
+        Set<User> users = Sets.newHashSet();
         for(int i=0; i < numUsers; ++i) {
-            UserId user = new UserId();
+            User user = new User();
             user.setUserId(Integer.toString(randomInt()));
             if(randomInt(2) == 0) {
                 user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
index d81f85a..71e569b 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
@@ -14,13 +14,19 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
+import com.google.common.collect.Sets;
 import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.InstagramConfig;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
 import java.util.Random;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -41,7 +47,7 @@ public class InstagramRecentMediaProviderTest {
     @Test
     public void testStartStream() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
-        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()) {
+        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), createNonNullConfiguration()) {
 
             private volatile boolean isFinished = false;
 
@@ -87,10 +93,10 @@ public class InstagramRecentMediaProviderTest {
         final CyclicBarrier test = new CyclicBarrier(2);
         final CyclicBarrier produce = new CyclicBarrier(2);
         final AtomicInteger batchCount = new AtomicInteger(0);
-        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(new InstagramUserInformationConfiguration()) {
+        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
             @Override
             protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
-                return new InstagramRecentMediaCollector(super.mediaFeedQueue, new InstagramUserInformationConfiguration()) {
+                return new InstagramRecentMediaCollector(super.mediaFeedQueue, createNonNullConfiguration()) {
 
                     private volatile boolean isFinished = false;
 
@@ -157,4 +163,13 @@ public class InstagramRecentMediaProviderTest {
         }
     }
 
+    private InstagramConfiguration createNonNullConfiguration() {
+        InstagramConfiguration configuration = new InstagramConfiguration();
+        UsersInfo info = new UsersInfo();
+        configuration.setUsersInfo(info);
+        info.setUsers(new HashSet<User>());
+        info.setAuthorizedTokens(new HashSet<String>());
+        return configuration;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index f6a0b60..79ca184 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -32,8 +32,15 @@ import java.util.concurrent.TimeUnit;
 public class ComponentUtils {
     private static final Logger LOGGER = LoggerFactory.getLogger(ComponentUtils.class);
 
+    /**
+     * Certain types of queues will fail to {@link java.util.Queue#offer(Object)} an item due to many factors
+     * depending on the type of queue. <code>offerUntilSuccess</code> will not return until the item has been
+     * successfully queued onto the desired queue
+     * @param entry item to queue
+     * @param queue queue to add the entry to
+     * @param <T>
+     */
     public static <T> void offerUntilSuccess(T entry, Queue<T> queue) {
-
         boolean success;
         do {
             success = queue.offer(entry);
@@ -42,6 +49,24 @@ public class ComponentUtils {
         while( !success );
     }
 
+    /**
+     * Certain types of queues will return null when calling {@link java.util.Queue#poll()} due to many factors depending
+     * on the type of queue.  <code>pollWhileNotEmpty</code> will poll the queue until an item from the queue is returned
+     * or the queue is empty.  If the queue is empty it will return NULL.
+     * @param queue
+     * @param <T>
+     * @return
+     */
+    public static <T> T pollWhileNotEmpty(Queue<T> queue) {
+        T item = queue.poll();
+        while(!queue.isEmpty() && item == null) {
+            Thread.yield();
+            item = queue.poll();
+        }
+        return item;
+    }
+
+
     public static String pollUntilStringNotEmpty(Queue queue) {
 
         String result = null;
@@ -58,21 +83,23 @@ public class ComponentUtils {
         return result;
     }
 
+    /**
+     * Attempts to safely {@link java.util.concurrent.ExecutorService#shutdown()} and {@link java.util.concurrent.ExecutorService#awaitTermination(long, java.util.concurrent.TimeUnit)}
+     * of an {@link java.util.concurrent.ExecutorService}.
+     * @param stream service to be shutdown
+     * @param initialWait time in seconds to wait for currently running threads to finish execution
+     * @param secondaryWait time in seconds to wait for running threads that did not terminate in the first wait to acknowledge their forced termination
+     */
     public static void shutdownExecutor(ExecutorService stream, int initialWait, int secondaryWait) {
-        stream.shutdown(); // Disable new tasks from being submitted
+        stream.shutdown();
         try {
-            // Wait a while for existing tasks to terminate
             if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
-                stream.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
                 if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
                     LOGGER.error("Executor Service did not terminate");
                 }
             }
         } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
             stream.shutdownNow();
-            // Preserve interrupt status
             Thread.currentThread().interrupt();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
deleted file mode 100644
index e438f54..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package org.apache.streams.util;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * A {@link java.util.concurrent.ConcurrentLinkedQueue} implementation that causes thread yields when data is not
- * successfully offered or polled.
- */
-public class ConcurentYieldTillSuccessQueue<T> extends ConcurrentLinkedQueue<T> {
-
-    @Override
-    public T poll() {
-        T item = null;
-        while(!super.isEmpty() && (item = super.poll()) == null) {
-            Thread.yield();;
-        }
-        return item;
-    }
-
-    @Override
-    public boolean offer(T t) {
-        while(!super.offer(t)) {
-            Thread.yield();
-        }
-        return true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
new file mode 100644
index 0000000..45e4239
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/AbstractBackOffStrategy.java
@@ -0,0 +1,78 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.util.api.requests.backoff;
+
+/**
+ * @see org.apache.streams.util.api.requests.backoff.BackOffStrategy
+ */
+public abstract class AbstractBackOffStrategy implements BackOffStrategy {
+
+    private long baseSleepTime;
+    private long lastSleepTime;
+    private int maxAttempts;
+    private int attemptsCount;
+
+    /**
+     * A BackOffStrategy that can effectively be used endlessly.
+     * @param baseBackOffTime amount of time back of in seconds
+     */
+    public AbstractBackOffStrategy(long baseBackOffTime) {
+        this(baseBackOffTime, -1);
+    }
+
+    /**
+     * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+     * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
+     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
+     */
+    public AbstractBackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
+        if(baseBackOffTime <= 0) {
+            throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime);
+        }
+        if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) {
+            throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts);
+        }
+        this.baseSleepTime = baseBackOffTime;
+        this.maxAttempts = maximumNumberOfBackOffAttempts;
+        this.attemptsCount = 0;
+    }
+
+    @Override
+    public void backOff() throws BackOffException {
+        if(this.attemptsCount++ >= this.maxAttempts && this.maxAttempts != -1) {
+            throw new BackOffException(this.attemptsCount-1, this.lastSleepTime);
+        } else {
+            try {
+                Thread.sleep(this.lastSleepTime = calculateBackOffTime(this.attemptsCount, this.baseSleepTime));
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Override
+    public void reset() {
+        this.attemptsCount = 0;
+    }
+
+    /**
+     * Calculate the amount of time in milliseconds that the strategy should back off for
+     * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
+     * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
+     * @return the amount of time it should back off in milliseconds
+     */
+    protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
index 0bdd82d..a38e55d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
@@ -1,8 +1,22 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.api.requests.backoff;
 
 /**
- * Exception that is thrown when a {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} has attempted to
- * <code>backOff()</code> more than the {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} was configured for.
+ * Exception that is thrown when a {@link AbstractBackOffStrategy} has attempted to
+ * <code>backOff()</code> more than the {@link AbstractBackOffStrategy} was configured for.
  */
 public class BackOffException extends Exception {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
index 628d37b..a132e02 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.api.requests.backoff;
 
 /**
@@ -19,69 +33,18 @@ package org.apache.streams.util.api.requests.backoff;
  * </code>
  *
  */
-public abstract class BackOffStrategy {
-
-    private long baseSleepTime;
-    private long lastSleepTime;
-    private int maxAttempts;
-    private int attemptsCount;
-
-    /**
-     * A BackOffStrategy that can effectively be used endlessly.
-     * @param baseBackOffTime amount of time back of in seconds
-     */
-    public BackOffStrategy(long baseBackOffTime) {
-        this(baseBackOffTime, -1);
-    }
-
-    /**
-     * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
-     * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
-     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
-     */
-    public BackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
-        if(baseBackOffTime <= 0) {
-            throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime);
-        }
-        if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) {
-            throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts);
-        }
-        this.baseSleepTime = baseBackOffTime;
-        this.maxAttempts = maximumNumberOfBackOffAttempts;
-        this.attemptsCount = 0;
-    }
+public interface BackOffStrategy {
 
     /**
      * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
      * on the number of times the backOff can be called, an exception will be thrown.
      * @throws BackOffException
      */
-    public void backOff() throws BackOffException {
-        if(this.attemptsCount++ >= this.maxAttempts && this.maxAttempts != -1) {
-            throw new BackOffException(this.attemptsCount-1, this.lastSleepTime);
-        } else {
-            try {
-                Thread.sleep(this.lastSleepTime = calculateBackOffTime(this.attemptsCount, this.baseSleepTime));
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
-            }
-        }
-    }
+    public void backOff() throws BackOffException;
 
     /**
-     * Rests the back off strategy to its original state.  After the call the strategy will act as if {@link BackOffStrategy#backOff()}
+     * Rests the back off strategy to its original state.  After the call the strategy will act as if {@link AbstractBackOffStrategy#backOff()}
      * has never been called.
      */
-    public void reset() {
-        this.attemptsCount = 0;
-    }
-
-    /**
-     * Calculate the amount of time in milliseconds that the strategy should back off for
-     * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
-     * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
-     * @return the amount of time it should back off in milliseconds
-     */
-    protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
-
+    public void reset();
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
index bfc523a..cc70fe1 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
@@ -1,13 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.api.requests.backoff.impl;
 
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
 
 /**
- * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes the current thread to sleep the
+ * A {@link org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy} that causes the current thread to sleep the
  * same amount of time each time <code>backOff()</code> is called.
  *
  */
-public class ConstantTimeBackOffStrategy extends BackOffStrategy {
+public class ConstantTimeBackOffStrategy extends AbstractBackOffStrategy {
 
     /**
      * A ConstantTimeBackOffStrategy that can effectively be used endlessly.

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
index af59a6a..65616e0 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
@@ -1,11 +1,25 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.api.requests.backoff.impl;
 
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
 
 /**
  * Exponential backk strategy.  Caluclated by baseBackOffTimeInSeconds raised the attempt-count power.
  */
-public class ExponentialBackOffStrategy extends BackOffStrategy {
+public class ExponentialBackOffStrategy extends AbstractBackOffStrategy {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
index 55f62a2..3443c9c 100644
--- a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
@@ -1,13 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.api.requests.backoff.impl;
 
-import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
 
 /**
- * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes back offs in linear increments. Each
+ * A {@link org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy} that causes back offs in linear increments. Each
  * attempt cause an increase back off period.
  * Calculated by attemptNumber * baseBackOffAmount.
  */
-public class LinearTimeBackOffStrategy extends BackOffStrategy{
+public class LinearTimeBackOffStrategy extends AbstractBackOffStrategy {
 
 
     public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
index fe48600..dfdec72 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.oauth.tokens;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index 903e48d..fed194f 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
 import org.apache.streams.util.oauth.tokens.AbstractOauthToken;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
index 34238b3..4c64bf7 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
 
 import org.apache.streams.util.oauth.tokens.AbstractOauthToken;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
index 5f3453e..a97fea9 100644
--- a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
@@ -11,7 +11,7 @@ import static org.junit.Assert.fail;
 public class BackOffStrategyTest {
 
 
-    private class TestBackOff extends BackOffStrategy {
+    private class TestBackOff extends AbstractBackOffStrategy {
 
         public TestBackOff(long sleep, int maxAttempts) {
             super(sleep, maxAttempts);
@@ -25,7 +25,7 @@ public class BackOffStrategyTest {
 
     @Test
     public void testUnlimitedBackOff() {
-        BackOffStrategy backOff = new TestBackOff(1, -1);
+        AbstractBackOffStrategy backOff = new TestBackOff(1, -1);
         try {
             for(int i=0; i < 100; ++i) {
                 backOff.backOff();
@@ -37,7 +37,7 @@ public class BackOffStrategyTest {
 
     @Test
     public void testLimitedUseBackOff()  {
-        BackOffStrategy backOff = new TestBackOff(1, 2);
+        AbstractBackOffStrategy backOff = new TestBackOff(1, 2);
         try {
             backOff.backOff();
         } catch (BackOffException boe) {
@@ -58,7 +58,7 @@ public class BackOffStrategyTest {
 
     @Test
     public void testBackOffSleep() throws BackOffException {
-        BackOffStrategy backOff = new TestBackOff(2000, 1);
+        AbstractBackOffStrategy backOff = new TestBackOff(2000, 1);
         long startTime = System.currentTimeMillis();
         backOff.backOff();
         long endTime = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
index c9e7de9..4c59277 100644
--- a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
@@ -14,7 +14,7 @@ public class ConstantTimeBackOffStrategyTest extends RandomizedTest{
 
     @Test
     public void constantTimeBackOffStategy() {
-        BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+        AbstractBackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
         assertEquals(1, backOff.calculateBackOffTime(1,1));
         assertEquals(1, backOff.calculateBackOffTime(2,1));
         assertEquals(1, backOff.calculateBackOffTime(3,1));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
index 43b42f7..e0744ca 100644
--- a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
@@ -12,7 +12,7 @@ public class ExponentialBackOffStrategyTest {
 
     @Test
     public void exponentialTimeBackOffStrategyTest() {
-        BackOffStrategy backOff = new ExponentialBackOffStrategy(1);
+        AbstractBackOffStrategy backOff = new ExponentialBackOffStrategy(1);
         assertEquals(5000, backOff.calculateBackOffTime(1,5));
         assertEquals(25000, backOff.calculateBackOffTime(2,5));
         assertEquals(125000, backOff.calculateBackOffTime(3,5));

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/a01b6911/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
index 7a0f848..ad7cf10 100644
--- a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
@@ -12,7 +12,7 @@ public class LinearTimeBackOffStartegyTest {
 
     @Test
     public void linearTimeBackOffStrategyTest() {
-        BackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
+        AbstractBackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
         assertEquals(1000, backOff.calculateBackOffTime(1,1));
         assertEquals(2000, backOff.calculateBackOffTime(2,1));
         assertEquals(3000, backOff.calculateBackOffTime(3,1));


[4/8] git commit: STREAMS-143 | Methods to set configuration in code and code review feedback

Posted by mf...@apache.org.
STREAMS-143 | Methods to set configuration in code and code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ecd80062
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ecd80062
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ecd80062

Branch: refs/heads/master
Commit: ecd80062969a611156bffb09defe4f0957b14fc4
Parents: 9f296be
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 13 14:50:28 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 13 14:50:28 2014 -0500

----------------------------------------------------------------------
 .../instagram/provider/InstagramOauthToken.java | 21 +++---
 .../provider/InstagramRecentMediaCollector.java | 44 +++++++-----
 .../provider/InstagramRecentMediaProvider.java  | 70 ++++++++++++++++++--
 .../com/instagram/InstagramConfiguration.json   | 24 ++++---
 .../util/oauth/tokens/AbstractOauthToken.java   | 19 ++++++
 .../streams/util/oauth/tokens/OauthToken.java   | 19 ------
 .../tokens/tokenmanager/SimpleTokenManager.java |  4 +-
 .../tokenmanager/impl/BasicTokenManger.java     |  4 +-
 .../tokenmanager/TestBasicTokenManager.java     |  4 +-
 9 files changed, 143 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index 9773f92..d41fc64 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -1,28 +1,29 @@
 package org.apache.streams.instagram.provider;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+
+import org.jinstagram.auth.model.Token;
 
 /**
- *
+ * Extends JInstagram Token. Only difference is it overrides the equal method and determines equality based on the
+ * token string.
  */
-public class InstagramOauthToken extends OauthToken{
+public class InstagramOauthToken extends Token {
 
-    private String clientId;
 
-    public InstagramOauthToken(String clientId) {
-        this.clientId = clientId;
+    public InstagramOauthToken(String token) {
+        this(token, null);
     }
 
-    public String getClientId() {
-        return clientId;
+    public InstagramOauthToken(String token, String secret) {
+        super(token, secret);
     }
 
     @Override
-    protected boolean internalEquals(Object o) {
+    public boolean equals(Object o) {
         if(!(o instanceof InstagramOauthToken)) {
             return false;
         }
         InstagramOauthToken that = (InstagramOauthToken) o;
-        return this.clientId.equals(that.clientId);
+        return this.getToken().equals(that.getToken());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 932828d..08b1696 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -16,6 +16,7 @@ package org.apache.streams.instagram.provider;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UserId;
 import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
@@ -52,6 +53,7 @@ public class InstagramRecentMediaCollector implements Runnable {
     private SimpleTokenManager<InstagramOauthToken> tokenManger;
     private int consecutiveErrorCount;
     private BackOffStrategy backOffStrategy;
+    private Instagram instagram;
 
 
     public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
@@ -59,17 +61,27 @@ public class InstagramRecentMediaCollector implements Runnable {
         this.config = config;
         this.isCompleted = new AtomicBoolean(false);
         this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
-        for (String clientId : this.config.getClientIds()) {
-            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
+        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
         }
         this.consecutiveErrorCount = 0;
         this.backOffStrategy = new ExponentialBackOffStrategy(2);
+        this.instagram = new Instagram(this.config.getClientId());
     }
 
 
+    /**
+     * If there are authorized tokens available, it sets a new token for the client and returns
+     * the client.  If there are no available tokens, it simply returns the client that was
+     * initialized in the constructor with client id.
+     * @return
+     */
     @VisibleForTesting
     protected Instagram getNextInstagramClient() {
-        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
+        if(this.tokenManger.numAvailableTokens() > 0) {
+            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+        }
+        return this.instagram;
     }
 
     private void queueData(MediaFeed userFeed, String userId) {
@@ -92,7 +104,7 @@ public class InstagramRecentMediaCollector implements Runnable {
     @Override
     public void run() {
         try {
-            for (UserId user : this.config.getUsersInfo().getUserIds()) {
+            for (User user : this.config.getUsersInfo().getUsers()) {
                 collectMediaFeed(user);
             }
         } catch (Exception e) {
@@ -108,7 +120,7 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @throws Exception
      */
     @VisibleForTesting
-    protected void collectMediaFeed(UserId user) throws Exception {
+    protected void collectMediaFeed(User user) throws Exception {
         Pagination pagination = null;
         do {
             int attempts = 0;
@@ -128,9 +140,16 @@ public class InstagramRecentMediaCollector implements Runnable {
                         feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
                     }
                 } catch (Exception e) {
-                    handleException(e);
-                    if(e instanceof InstagramBadRequestException) {
+                    if(e instanceof InstagramRateLimitException) {
+                        LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+                        this.backOffStrategy.backOff();
+                    } else if(e instanceof InstagramBadRequestException) {
+                        LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
                         attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+                        ++this.consecutiveErrorCount;
+                    } else {
+                        LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+                        ++this.consecutiveErrorCount;
                     }
                     if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
                         throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
@@ -156,16 +175,7 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @throws BackOffException
      */
     protected void handleException(Exception e) throws BackOffException {
-        if(e instanceof InstagramRateLimitException) {
-            LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
-            this.backOffStrategy.backOff();
-        } else if(e instanceof InstagramBadRequestException) {
-            LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
-            ++this.consecutiveErrorCount;
-        } else {
-            LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
-            ++this.consecutiveErrorCount;
-        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index e3e9399..6f975d0 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -16,18 +16,22 @@ package org.apache.streams.instagram.provider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.*;
 import org.apache.streams.util.SerializationUtil;
+import org.jinstagram.auth.model.Token;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -45,12 +49,11 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
     private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
-        this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
     }
 
     public InstagramRecentMediaProvider(InstagramConfiguration config) {
-        this.config = config;
-        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
     }
 
     @Override
@@ -66,6 +69,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
      */
     @VisibleForTesting
     protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+        this.updateUserInfoList();
         return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
     }
 
@@ -101,6 +105,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
 
     @Override
     public void prepare(Object configurationObject) {
+        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
         this.isCompleted = new AtomicBoolean(false);
     }
 
@@ -126,7 +131,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
         }
         DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
         DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
-        for(UserId user : usersInfo.getUserIds()) {
+        for(User user : usersInfo.getUsers()) {
             if(defaultAfterDate != null && user.getAfterDate() == null) {
                 user.setAfterDate(defaultAfterDate);
             }
@@ -136,5 +141,62 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
         }
     }
 
+    /**
+     * Overrides the client id in the configuration.
+     * @param clientId client id to use
+     */
+    public void setInstagramClientId(String clientId) {
+        this.config.setClientId(clientId);
+    }
+
+    /**
+     * Overrides authroized user tokens in the configuration.
+     * @param tokenStrings
+     */
+    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+    }
+
+    /**
+     * Overrides the default before date in the configuration
+     * @param beforeDate
+     */
+    public void setDefaultBeforeDate(DateTime beforeDate) {
+        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+    }
+
+    /**
+     * Overrides the default after date in the configuration
+     * @param afterDate
+     */
+    public void setDefaultAfterDate(DateTime afterDate) {
+        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+    }
+
+    /**
+     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
+     * NULL DateTimes.
+     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+     */
+    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+        Set<User> users = Sets.newHashSet();
+        for(String userId : usersWithAfterDate.keySet()) {
+            User user = new User();
+            user.setUserId(userId);
+            user.setAfterDate(usersWithAfterDate.get(userId));
+            users.add(user);
+        }
+        ensureUsersInfo(this.config).setUsers(users);
+    }
+
+    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+        UsersInfo usersInfo = config.getUsersInfo();
+        if(usersInfo == null) {
+            usersInfo = new UsersInfo();
+            config.setUsersInfo(usersInfo);
+        }
+        return usersInfo;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index c662660..431efbc 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,23 +5,27 @@
     "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "clientIds": {
-            "type": "array",
-            "uniqueItems": true,
-            "items": {
-                "type": "string"
-            },
-            "description": "Your Instagram Client Ids"
+        "clientId": {
+            "type": "string",
+            "description": "Your Instagram Client Id"
         },
         "usersInfo": {
             "type": "object",
             "properties": {
-                "userIds": {
+                "authorizedTokens": {
+                    "type": "array",
+                    "uniqueItems": true,
+                    "items": {
+                        "type": "string"
+                    },
+                    "description": "Instagram tokens for authorized users of your client/app"
+                },
+                "users": {
                     "type": "array",
                     "uniqueItems": true,
                     "items": {
                         "type": "object",
-                        "$ref": "#/definitions/userInfo"
+                        "$ref": "#/definitions/user"
                     },
                     "description": "List of user ids to gather data for. Type of data gathered depends on provider"
                 },
@@ -39,7 +43,7 @@
         }
    },
     "definitions": {
-        "userInfo": {
+        "user": {
             "type": "object",
             "properties": {
                 "userId": {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
new file mode 100644
index 0000000..fe48600
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -0,0 +1,19 @@
+package org.apache.streams.util.oauth.tokens;
+
+/**
+ *
+ */
+public abstract class AbstractOauthToken {
+
+    /**
+     * Must create equals method for all OauthTokens.
+     * @param o
+     * @return true if equal, and false otherwise
+     */
+    protected abstract boolean internalEquals(Object o);
+
+    @Override
+    public boolean equals(Object o) {
+        return this.internalEquals(o);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
deleted file mode 100644
index df264c5..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.streams.util.oauth.tokens;
-
-/**
- *
- */
-public abstract class OauthToken {
-
-    /**
-     * Must create equals method for all OauthTokens.
-     * @param o
-     * @return true if equal, and false otherwise
-     */
-    protected abstract boolean internalEquals(Object o);
-
-    @Override
-    public boolean equals(Object o) {
-        return this.internalEquals(o);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index d052da1..903e48d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -1,13 +1,13 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 
 import java.util.Collection;
 
 /**
  * Manges access to oauth tokens.  Allows a caller to add tokens to the token pool and receive an available token.
  */
-public interface SimpleTokenManager<T extends OauthToken> {
+public interface SimpleTokenManager<T> {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
index 20c8d20..34238b3 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -1,6 +1,6 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
 
 import java.util.ArrayList;
@@ -15,7 +15,7 @@ import java.util.Collection;
  *
  * The manager class is thread safe.
  */
-public class BasicTokenManger<T extends OauthToken> implements SimpleTokenManager<T>{
+public class BasicTokenManger<T> implements SimpleTokenManager<T>{
 
     private ArrayList<T> availableTokens;
     private int nextToken;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
index 903cc69..cd9ed18 100644
--- a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
+++ b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -1,7 +1,7 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
 import org.junit.Test;
 
@@ -22,7 +22,7 @@ public class TestBasicTokenManager {
     /**
      * Simple token for testing purposes
      */
-    private class TestToken extends OauthToken {
+    private class TestToken extends AbstractOauthToken {
 
         private String s;
 


[5/8] git commit: Merge pull request #2 from apache/master

Posted by mf...@apache.org.
Merge pull request #2 from apache/master

Apache master merge 2014/08/13

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9c6164c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9c6164c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9c6164c7

Branch: refs/heads/master
Commit: 9c6164c7f74ae9ba57e90f743d2620525edc7724
Parents: b22efee 2977287
Author: Ryan Ebanks <ry...@raveldata.com>
Authored: Wed Aug 13 14:53:55 2014 -0500
Committer: Ryan Ebanks <ry...@raveldata.com>
Committed: Wed Aug 13 14:53:55 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../configuration/CassandraConfiguration.java   |  18 +
 .../model/CassandraActivityStreamsEntry.java    |  18 +
 .../repository/impl/CassandraKeyspace.java      |  18 +
 .../ElasticsearchConfigurator.java              |  28 +-
 .../ElasticsearchPersistDeleter.java            | 102 +++++
 .../ElasticsearchPersistReader.java             |  18 +-
 .../ElasticsearchPersistUpdater.java            | 413 ++-----------------
 .../ElasticsearchPersistWriter.java             |  30 +-
 .../elasticsearch/ElasticsearchQuery.java       |  95 +----
 .../elasticsearch/PercolateProcessor.java       | 169 --------
 .../processor/PercolateTagProcessor.java        | 387 +++++++++++++++++
 .../ElasticsearchWriterConfiguration.json       |  25 +-
 .../streams-provider-datasift/pom.xml           |   2 +-
 .../datasift/provider/DatasiftPushProvider.java | 133 ++++++
 .../provider/DatasiftStreamConfigurator.java    |  10 +-
 .../provider/DatasiftStreamProvider.java        |  12 +-
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  36 +-
 .../DatasiftDefaultActivitySerializer.java      |  21 +-
 .../DatasiftTweetActivitySerializer.java        |  14 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  84 ++++
 .../main/jsonschema/com/datasift/Datasift.json  |  36 +-
 .../com/datasift/DatasiftPushConfiguration.json |  17 +
 .../datasift/DatasiftStreamConfiguration.json   |  17 +
 .../com/datasift/DatasiftTwitterUser.json       |   9 +-
 .../DatasiftActivitySerializerTest.java         |  31 +-
 .../streams-provider-facebook/pom.xml           |  22 +-
 .../api/FacebookPostActivitySerializer.java     | 334 ---------------
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 --
 .../api/FacebookPostActivitySerializer.java     | 286 +++++++++++++
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 ++
 .../processor/FacebookTypeConverter.java        | 194 +++++++++
 .../provider/FacebookFriendFeedProvider.java    | 282 +++++++++++++
 .../provider/FacebookFriendUpdatesProvider.java | 286 +++++++++++++
 .../FacebookUserInformationProvider.java        | 299 ++++++++++++++
 .../provider/FacebookUserstreamProvider.java    | 320 ++++++++++++++
 .../jsonschema/com/facebook/graph/Post.json     | 192 ---------
 .../streams/facebook/FacebookConfiguration.json |  49 +++
 .../FacebookUserInformationConfiguration.json   |  23 ++
 .../FacebookUserstreamConfiguration.json        |  22 +
 .../org/apache/streams/facebook/graph/Post.json | 203 +++++++++
 .../test/FacebookActivitySerDeTest.java         |  78 ++++
 .../FacebookPostActivitySerializerTest.java     | 215 ----------
 .../facebook/test/FacebookPostSerDeTest.java    |   6 +-
 .../src/test/resources/Facebook.json            | 250 +++++++++++
 .../org/apache/streams/data/Facebook.json       | 251 -----------
 .../gnip-edc-facebook/pom.xml                   |   2 +-
 .../src/test/resources/FlickrEDC.xml            | 128 +++---
 .../src/test/resources/RedditEDC.xml            | 200 ++++-----
 .../src/test/resources/RedditEDCFlattened.xml   | 200 ++++-----
 .../src/test/resources/redditTest.xml           |   2 +-
 .../serializer/SyndEntryActivitySerializer.java |  31 +-
 .../test/SyndEntryActivitySerizlizerTest.java   |  27 +-
 .../src/test/resources/TestSyndEntryJson.txt    |   1 +
 .../processor/TwitterEventProcessor.java        |  46 +--
 .../twitter/provider/TwitterErrorHandler.java   |  18 +-
 .../provider/TwitterStreamProcessor.java        |  98 +++++
 .../twitter/provider/TwitterStreamProvider.java | 104 +++--
 .../provider/TwitterStreamProviderTask.java     |  71 ----
 .../provider/TwitterTimelineProvider.java       | 232 +++++------
 .../provider/TwitterTimelineProviderTask.java   |  56 ++-
 .../serializer/StreamsTwitterMapper.java        |  12 +-
 .../activityconsumer/ActivityConsumer.java      |  18 +
 .../ActivityConsumerWarehouse.java              |  18 +
 .../impl/ActivityConsumerWarehouseImpl.java     |  18 +
 .../impl/PushActivityConsumer.java              |  18 +
 .../ActivityPublisherRegistration.java          |  18 +
 .../ActivityStreamsSubscriberRegistration.java  |  18 +
 .../ActivityStreamsSubscriber.java              |  18 +
 .../ActivityStreamsSubscriberWarehouse.java     |  18 +
 .../ActivityStreamsSubscription.java            |  18 +
 .../ActivityStreamsSubscriptionFilter.java      |  18 +
 .../ActivityStreamsSubscriptionOutput.java      |  18 +
 .../impl/ActivityStreamsSubscriberDelegate.java |  18 +
 .../ActivityStreamsSubscriberWarehouseImpl.java |  18 +
 ...yStreamsSubscriptionCassandraFilterImpl.java |  18 +
 .../impl/ActivityStreamsSubscriptionImpl.java   |  18 +
 ...vityStreamsSubscriptionLuceneFilterImpl.java |  18 +
 streams-pojo/pom.xml                            |   6 +
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 .../org/apache/streams/data/util/JsonUtil.java  |  60 ++-
 .../local/tasks/StreamsProviderTask.java        |   3 +-
 .../org/apache/streams/util/ComponentUtils.java |  32 +-
 84 files changed, 4351 insertions(+), 2393 deletions(-)
----------------------------------------------------------------------



[6/8] git commit: Merged master

Posted by mf...@apache.org.
Merged master


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/557e54e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/557e54e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/557e54e2

Branch: refs/heads/master
Commit: 557e54e227612036560b7128d34733c156910aa0
Parents: ecd8006 9c6164c
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 13 14:57:45 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 13 14:57:45 2014 -0500

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 .../configuration/CassandraConfiguration.java   |  18 +
 .../model/CassandraActivityStreamsEntry.java    |  18 +
 .../repository/impl/CassandraKeyspace.java      |  18 +
 .../ElasticsearchConfigurator.java              |  28 +-
 .../ElasticsearchPersistDeleter.java            | 102 +++++
 .../ElasticsearchPersistReader.java             |  18 +-
 .../ElasticsearchPersistUpdater.java            | 413 ++-----------------
 .../ElasticsearchPersistWriter.java             |  30 +-
 .../elasticsearch/ElasticsearchQuery.java       |  95 +----
 .../elasticsearch/PercolateProcessor.java       | 169 --------
 .../processor/PercolateTagProcessor.java        | 387 +++++++++++++++++
 .../ElasticsearchWriterConfiguration.json       |  25 +-
 .../streams-provider-datasift/pom.xml           |   2 +-
 .../datasift/provider/DatasiftPushProvider.java | 133 ++++++
 .../provider/DatasiftStreamConfigurator.java    |  10 +-
 .../provider/DatasiftStreamProvider.java        |  12 +-
 .../DatasiftTypeConverterProcessor.java         |   7 +-
 .../serializer/DatasiftActivitySerializer.java  |  36 +-
 .../DatasiftDefaultActivitySerializer.java      |  21 +-
 .../DatasiftTweetActivitySerializer.java        |  14 +-
 .../datasift/util/StreamsDatasiftMapper.java    |  84 ++++
 .../main/jsonschema/com/datasift/Datasift.json  |  36 +-
 .../com/datasift/DatasiftPushConfiguration.json |  17 +
 .../datasift/DatasiftStreamConfiguration.json   |  17 +
 .../com/datasift/DatasiftTwitterUser.json       |   9 +-
 .../DatasiftActivitySerializerTest.java         |  31 +-
 .../streams-provider-facebook/pom.xml           |  22 +-
 .../api/FacebookPostActivitySerializer.java     | 334 ---------------
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 --
 .../api/FacebookPostActivitySerializer.java     | 286 +++++++++++++
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 ++
 .../processor/FacebookTypeConverter.java        | 194 +++++++++
 .../provider/FacebookFriendFeedProvider.java    | 282 +++++++++++++
 .../provider/FacebookFriendUpdatesProvider.java | 286 +++++++++++++
 .../FacebookUserInformationProvider.java        | 299 ++++++++++++++
 .../provider/FacebookUserstreamProvider.java    | 320 ++++++++++++++
 .../jsonschema/com/facebook/graph/Post.json     | 192 ---------
 .../streams/facebook/FacebookConfiguration.json |  49 +++
 .../FacebookUserInformationConfiguration.json   |  23 ++
 .../FacebookUserstreamConfiguration.json        |  22 +
 .../org/apache/streams/facebook/graph/Post.json | 203 +++++++++
 .../test/FacebookActivitySerDeTest.java         |  78 ++++
 .../FacebookPostActivitySerializerTest.java     | 215 ----------
 .../facebook/test/FacebookPostSerDeTest.java    |   6 +-
 .../src/test/resources/Facebook.json            | 250 +++++++++++
 .../org/apache/streams/data/Facebook.json       | 251 -----------
 .../gnip-edc-facebook/pom.xml                   |   2 +-
 .../src/test/resources/FlickrEDC.xml            | 128 +++---
 .../src/test/resources/RedditEDC.xml            | 200 ++++-----
 .../src/test/resources/RedditEDCFlattened.xml   | 200 ++++-----
 .../src/test/resources/redditTest.xml           |   2 +-
 .../serializer/SyndEntryActivitySerializer.java |  31 +-
 .../test/SyndEntryActivitySerizlizerTest.java   |  27 +-
 .../src/test/resources/TestSyndEntryJson.txt    |   1 +
 .../processor/TwitterEventProcessor.java        |  46 +--
 .../twitter/provider/TwitterErrorHandler.java   |  18 +-
 .../provider/TwitterStreamProcessor.java        |  98 +++++
 .../twitter/provider/TwitterStreamProvider.java | 104 +++--
 .../provider/TwitterStreamProviderTask.java     |  71 ----
 .../provider/TwitterTimelineProvider.java       | 232 +++++------
 .../provider/TwitterTimelineProviderTask.java   |  56 ++-
 .../serializer/StreamsTwitterMapper.java        |  12 +-
 .../activityconsumer/ActivityConsumer.java      |  18 +
 .../ActivityConsumerWarehouse.java              |  18 +
 .../impl/ActivityConsumerWarehouseImpl.java     |  18 +
 .../impl/PushActivityConsumer.java              |  18 +
 .../ActivityPublisherRegistration.java          |  18 +
 .../ActivityStreamsSubscriberRegistration.java  |  18 +
 .../ActivityStreamsSubscriber.java              |  18 +
 .../ActivityStreamsSubscriberWarehouse.java     |  18 +
 .../ActivityStreamsSubscription.java            |  18 +
 .../ActivityStreamsSubscriptionFilter.java      |  18 +
 .../ActivityStreamsSubscriptionOutput.java      |  18 +
 .../impl/ActivityStreamsSubscriberDelegate.java |  18 +
 .../ActivityStreamsSubscriberWarehouseImpl.java |  18 +
 ...yStreamsSubscriptionCassandraFilterImpl.java |  18 +
 .../impl/ActivityStreamsSubscriptionImpl.java   |  18 +
 ...vityStreamsSubscriptionLuceneFilterImpl.java |  18 +
 streams-pojo/pom.xml                            |   6 +
 .../apache/streams/data/util/ActivityUtil.java  |   5 +
 .../org/apache/streams/data/util/JsonUtil.java  |  60 ++-
 .../local/tasks/StreamsProviderTask.java        |   3 +-
 .../org/apache/streams/util/ComponentUtils.java |  32 +-
 84 files changed, 4351 insertions(+), 2393 deletions(-)
----------------------------------------------------------------------



[8/8] git commit: STREAMS-143 | Fixed accidental removal of shutdownNow() call

Posted by mf...@apache.org.
STREAMS-143 | Fixed accidental removal of shutdownNow() call


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c33f6654
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c33f6654
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c33f6654

Branch: refs/heads/master
Commit: c33f66544b7edb1dfc1ce09240a2b76542f6a74b
Parents: a01b691
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 15:35:34 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 15:35:34 2014 -0500

----------------------------------------------------------------------
 .../src/main/java/org/apache/streams/util/ComponentUtils.java       | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c33f6654/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
index 79ca184..9f3c480 100644
--- a/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
+++ b/streams-util/src/main/java/org/apache/streams/util/ComponentUtils.java
@@ -94,6 +94,7 @@ public class ComponentUtils {
         stream.shutdown();
         try {
             if (!stream.awaitTermination(initialWait, TimeUnit.SECONDS)) {
+                stream.shutdownNow();
                 if (!stream.awaitTermination(secondaryWait, TimeUnit.SECONDS)) {
                     LOGGER.error("Executor Service did not terminate");
                 }


[3/8] git commit: STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates

Posted by mf...@apache.org.
STREAMS-143 | Refactored InstagramProvider to accept date ranges for user updates


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/9f296be7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/9f296be7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/9f296be7

Branch: refs/heads/master
Commit: 9f296be7a9fe5c4f0a6449e824cdadc8ad982add
Parents: ff5f821
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-2.local>
Authored: Tue Aug 12 15:46:59 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-2.local>
Committed: Tue Aug 12 15:46:59 2014 -0500

----------------------------------------------------------------------
 .../instagram/provider/InstagramOauthToken.java |  28 +++
 .../provider/InstagramRecentMediaCollector.java | 198 ++++++++++---------
 .../provider/InstagramRecentMediaProvider.java  |  35 +++-
 .../com/instagram/InstagramConfiguration.json   |  18 +-
 .../InstagramRecentMediaCollectorTest.java      | 138 ++++++-------
 .../util/ConcurentYieldTillSuccessQueue.java    |  27 +++
 6 files changed, 269 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
new file mode 100644
index 0000000..9773f92
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -0,0 +1,28 @@
+package org.apache.streams.instagram.provider;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+
+/**
+ *
+ */
+public class InstagramOauthToken extends OauthToken{
+
+    private String clientId;
+
+    public InstagramOauthToken(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    @Override
+    protected boolean internalEquals(Object o) {
+        if(!(o instanceof InstagramOauthToken)) {
+            return false;
+        }
+        InstagramOauthToken that = (InstagramOauthToken) o;
+        return this.clientId.equals(that.clientId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 4f27e49..932828d 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -14,26 +14,31 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
-import com.google.common.collect.Sets;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.UserId;
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
 import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Executes on all of the Instagram requests to collect the media feed data.
- *
+ * <p/>
  * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
- * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy for up to
- * 5 attempts.
- *
+ * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy.
  */
 public class InstagramRecentMediaCollector implements Runnable {
 
@@ -41,125 +46,126 @@ public class InstagramRecentMediaCollector implements Runnable {
     protected static final int MAX_ATTEMPTS = 5;
     protected static final int SLEEP_SECS = 5; //5 seconds
 
-    protected Queue dataQueue; //exposed for testing
-    private InstagramUserInformationConfiguration config;
-    private Instagram instagramClient;
+    protected Queue<MediaFeedData> dataQueue; //exposed for testing
+    private InstagramConfiguration config;
     private AtomicBoolean isCompleted;
+    private SimpleTokenManager<InstagramOauthToken> tokenManger;
+    private int consecutiveErrorCount;
+    private BackOffStrategy backOffStrategy;
 
 
-    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration config) {
+    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
         this.dataQueue = queue;
         this.config = config;
-        this.instagramClient = new Instagram(this.config.getClientId());
         this.isCompleted = new AtomicBoolean(false);
+        this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
+        for (String clientId : this.config.getClientIds()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
+        }
+        this.consecutiveErrorCount = 0;
+        this.backOffStrategy = new ExponentialBackOffStrategy(2);
     }
 
-    /**
-     * Set instagram client
-     * @param instagramClient
-     */
-    protected void setInstagramClient(Instagram instagramClient) {
-        this.instagramClient = instagramClient;
+
+    @VisibleForTesting
+    protected Instagram getNextInstagramClient() {
+        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
     }
 
-    /**
-     * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration} and
-     * converts them to {@link java.lang.Long}
-     * @return
-     */
-    protected Set<Long> getUserIds() {
-        Set<Long> userIds = Sets.newHashSet();
-        for(String id : config.getUserIds()) {
-            try {
-                userIds.add(Long.parseLong(id));
-            } catch (NumberFormatException nfe) {
-                LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
+    private void queueData(MediaFeed userFeed, String userId) {
+        if (userFeed == null) {
+            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
+        } else {
+            for (MediaFeedData data : userFeed.getData()) {
+                this.dataQueue.offer(data);
             }
         }
-        return userIds;
     }
 
     /**
-     * Determins the course of action to take when Instagram returns an exception to a request.  If it is a rate limit
-     * exception, it implements an exponentional back off strategy.  If it is anyother exception, it is logged and
-     * rethrown.
-     * @param instaExec exception to handle
-     * @param attempt number of attempts that have occured to pull this users information
-     * @throws InstagramException
+     * @return true when the collector has queued all of the available media feed data for the provided users.
      */
-    protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException {
-        LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
-        if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
-            long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000;
-            try {
-                LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime);
-                Thread.sleep(sleepTime);
-            } catch (InterruptedException ie) {
-                Thread.currentThread().interrupt();
+    public boolean isCompleted() {
+        return this.isCompleted.get();
+    }
+
+    @Override
+    public void run() {
+        try {
+            for (UserId user : this.config.getUsersInfo().getUserIds()) {
+                collectMediaFeed(user);
             }
-        } else {
-            LOGGER.error("Instagram returned an excetpion to the user media request : {}", instaExec.getMessage());
-            throw instaExec;
+        } catch (Exception e) {
+            LOGGER.error("Shutting down InstagramCollector. Exception occured: {}", e.getMessage());
         }
+        this.isCompleted.set(true);
     }
 
     /**
-     * Gets the MediaFeedData for this particular user and adds it to the share queued.
-     * @param userId
+     * Pull Recement Media for a user and queues the resulting data. Will try a single call 5 times before failing and
+     * moving on to the next call or returning.
+     * @param user
+     * @throws Exception
      */
-    private void getUserMedia(Long userId) {
-        MediaFeed feed = null;
-        int attempts = 0;
-        int count = 0;
+    @VisibleForTesting
+    protected void collectMediaFeed(UserId user) throws Exception {
+        Pagination pagination = null;
         do {
-            ++attempts;
-            try {
-                feed = this.instagramClient.getRecentMediaFeed(userId);
-                queueData(feed, userId);
-                count += feed.getData().size();
-                while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage()) {
-                    feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination());
-                    queueData(feed, userId);
-                    count += feed.getData().size();
-                }
-            } catch (InstagramException ie) {
+            int attempts = 0;
+            boolean succesfullDataPull = false;
+            while (!succesfullDataPull && attempts < MAX_ATTEMPTS) {
+                ++attempts;
+                MediaFeed feed = null;
                 try {
-                    handleInstagramException(ie, attempts);
-                } catch (InstagramException ie2) { //not a rate limit exception, ignore user
-                    attempts = MAX_ATTEMPTS;
-                }
-            }
-        } while(feed == null && attempts < MAX_ATTEMPTS);
-        LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count);
-    }
-
-    private void queueData(MediaFeed userFeed, Long userId) {
-        if(userFeed == null) {
-            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
-        } else {
-            for(MediaFeedData data : userFeed.getData()) {
-                synchronized (this.dataQueue) { //unnecessary
-                    while(!this.dataQueue.offer(data)) {
-                        Thread.yield();
+                    if (pagination == null) {
+                        feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+                                0,
+                                null,
+                                null,
+                                user.getBeforeDate() == null ? null : user.getBeforeDate().toDate(),
+                                user.getAfterDate() == null ? null : user.getAfterDate().toDate());
+                    } else {
+                        feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
+                    }
+                } catch (Exception e) {
+                    handleException(e);
+                    if(e instanceof InstagramBadRequestException) {
+                        attempts = MAX_ATTEMPTS; //don't repeat bad requests.
                     }
+                    if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
+                        throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
+                    }
+                }
+                if(succesfullDataPull = feed != null) {
+                    this.consecutiveErrorCount = 0;
+                    this.backOffStrategy.reset();
+                    pagination = feed.getPagination();
+                    queueData(feed, user.getUserId());
                 }
             }
-        }
+            if(!succesfullDataPull) {
+                LOGGER.error("Failed to get data from instagram for user id, {}, skipping user.", user.getUserId());
+            }
+        } while (pagination != null && pagination.hasNextPage());
     }
 
     /**
-     *
-     * @return true when the collector has queued all of available media feed data for the provided users.
+     * Handles different types of {@link java.lang.Exception} caught while trying to pull Instagram data.
+     * BackOffs/Sleeps when it encounters a rate limit expection..
+     * @param e
+     * @throws BackOffException
      */
-    public boolean isCompleted() {
-        return this.isCompleted.get();
-    }
-
-    @Override
-    public void run() {
-        for(Long userId : getUserIds()) {
-            getUserMedia(userId);
+    protected void handleException(Exception e) throws BackOffException {
+        if(e instanceof InstagramRateLimitException) {
+            LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+            this.backOffStrategy.backOff();
+        } else if(e instanceof InstagramBadRequestException) {
+            LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
+            ++this.consecutiveErrorCount;
+        } else {
+            LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+            ++this.consecutiveErrorCount;
         }
-        this.isCompleted.set(true);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index 30ddda4..e3e9399 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -14,17 +14,19 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.instagram.*;
+import org.apache.streams.util.SerializationUtil;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -36,17 +38,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public class InstagramRecentMediaProvider implements StreamsProvider {
 
-    private InstagramUserInformationConfiguration config;
+    private InstagramConfiguration config;
     private InstagramRecentMediaCollector dataCollector;
     protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
     private ExecutorService executorService;
     private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
-        this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+        this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")));
     }
 
-    public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) {
+    public InstagramRecentMediaProvider(InstagramConfiguration config) {
         this.config = config;
         this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
     }
@@ -62,6 +64,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
      * EXPOSED FOR TESTING
      * @return
      */
+    @VisibleForTesting
     protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
         return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
     }
@@ -112,4 +115,26 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
             this.executorService = null;
         }
     }
+
+    /**
+     * Add default start and stop points if necessary.
+     */
+    private void updateUserInfoList() {
+        UsersInfo usersInfo = this.config.getUsersInfo();
+        if(usersInfo.getDefaultAfterDate() == null && usersInfo.getDefaultBeforeDate() == null) {
+            return;
+        }
+        DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
+        DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
+        for(UserId user : usersInfo.getUserIds()) {
+            if(defaultAfterDate != null && user.getAfterDate() == null) {
+                user.setAfterDate(defaultAfterDate);
+            }
+            if(defaultBeforeDate != null && user.getBeforeDate() == null) {
+                user.setBeforeDate(defaultBeforeDate);
+            }
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index 7b894a2..c662660 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,9 +5,13 @@
     "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "clientId": {
-            "type": "string",
-            "description": "Your Instagram Client Id"
+        "clientIds": {
+            "type": "array",
+            "uniqueItems": true,
+            "items": {
+                "type": "string"
+            },
+            "description": "Your Instagram Client Ids"
         },
         "usersInfo": {
             "type": "object",
@@ -23,12 +27,12 @@
                 },
                 "defaultAfterDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for all users that don't have date ranges specified. If this is null it will pull from the earliest possible time"
                 },
                 "defaultBeforeDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for all users that don't have date ranges specified. If this is null it will pull till current time."
                 }
             }
@@ -44,12 +48,12 @@
                 },
                 "afterDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
                 },
                 "beforeDate": {
                     "type": "string",
-                    "format": "datetime",
+                    "format": "date-time",
                     "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
index 0225b40..74b5139 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -14,16 +14,25 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider;
 
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramConfiguration;
 import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.instagram.UserId;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.util.ConcurentYieldTillSuccessQueue;
 import org.jinstagram.Instagram;
 import org.jinstagram.entity.common.Pagination;
 import org.jinstagram.entity.users.feed.MediaFeed;
 import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
 import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.joda.time.DateTime;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -34,91 +43,63 @@ import java.util.*;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.*;
-import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
  */
-public class InstagramRecentMediaCollectorTest {
+public class InstagramRecentMediaCollectorTest extends RandomizedTest {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
 
     private int expectedDataCount = 0;
-    private long randomSeed = System.currentTimeMillis();
-    private Random rand = new Random(randomSeed);
-    private Map<Pagination, MediaFeed> pageMap = Maps.newHashMap();
 
     @Test
     public void testHandleInstagramException1() throws InstagramException {
-        InstagramException ie = mock(InstagramException.class);
+        InstagramException ie = mock(InstagramRateLimitException.class);
         when(ie.getRemainingLimitStatus()).thenReturn(1);
         final String message = "Test Message";
         when(ie.getMessage()).thenReturn(message);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurentYieldTillSuccessQueue<MediaFeedData>(), new InstagramConfiguration());
         try {
-            collector.handleInstagramException(ie, 1);
-            fail("Expected RuntimeException to be thrown");
-        } catch (InstagramException rte) {
-//            assertTrue(rte.getMessage().contains("Mock for InstagramException"));
-            assertEquals(message, rte.getMessage());
+            long startTime = System.currentTimeMillis();
+            collector.handleException(ie);
+            long endTime = System.currentTimeMillis();
+            assertTrue(2000 <= endTime - startTime);
+            startTime = System.currentTimeMillis();
+            collector.handleException(ie);
+            endTime = System.currentTimeMillis();
+            assertTrue(4000 <= endTime - startTime);
+        } catch (Exception e) {
+            fail("Should not have thrown an exception.");
         }
     }
 
-    @Test
-    public void testHandleInstagramException2() throws InstagramException{
-        InstagramException ie = mock(InstagramException.class);
-        when(ie.getRemainingLimitStatus()).thenReturn(0);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
-        long startTime = System.currentTimeMillis();
-        collector.handleInstagramException(ie, 1);
-        long endTime = System.currentTimeMillis();
-        LOGGER.debug("Slept for {} ms", startTime - endTime);
-        assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error
-        startTime = System.currentTimeMillis();
-        collector.handleInstagramException(ie, 2);
-        endTime = System.currentTimeMillis();
-        LOGGER.debug("Slept for {} ms", startTime - endTime);
-        assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error
-    }
-
-    @Test
-    public void testGetUserIds() {
-        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
-        List<String> userIds = Lists.newLinkedList();
-        userIds.add("1");
-        userIds.add("2");
-        userIds.add("3");
-        userIds.add("4");
-        userIds.add("abcdefg");
-        config.setUserIds(userIds);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), config);
-
-        Set<Long> expected = Sets.newHashSet();
-        expected.add(1L);
-        expected.add(2L);
-        expected.add(3L);
-        expected.add(4L);
-
-        assertEquals(expected, collector.getUserIds());
-    }
 
     @Test
+    @Repeat(iterations = 3)
     public void testRun() {
-        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
-        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
-        List<String> userIds = Lists.newLinkedList();
-        userIds.add("1");
-        userIds.add("2");
-        userIds.add("3");
-        userIds.add("4");
-        config.setUserIds(userIds);
-        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config);
-        collector.setInstagramClient(createMockInstagramClient());
+        this.expectedDataCount = 0;
+        Queue<MediaFeedData> data = new ConcurentYieldTillSuccessQueue<MediaFeedData>();
+        InstagramConfiguration config = new InstagramConfiguration();
+        UsersInfo usersInfo = new UsersInfo();
+        config.setUsersInfo(usersInfo);
+        Set<UserId> users = creatUsers(randomIntBetween(0, 100));
+        usersInfo.setUserIds(users);
+
+        final Instagram mockInstagram = createMockInstagramClient();
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
+            @Override
+            protected Instagram getNextInstagramClient() {
+                return mockInstagram;
+            }
+        };
+        assertFalse(collector.isCompleted());
         collector.run();
-        LOGGER.debug("Random seed == {}", randomSeed);
-        assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size());
+        assertTrue(collector.isCompleted());
+        assertEquals(this.expectedDataCount, data.size());
     }
 
     private Instagram createMockInstagramClient() {
@@ -127,12 +108,19 @@ public class InstagramRecentMediaCollectorTest {
             final InstagramException mockException = mock(InstagramException.class);
             when(mockException.getRemainingLimitStatus()).thenReturn(-1);
             when(mockException.getMessage()).thenReturn("MockInstagramException message");
-            when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer<MediaFeed>() {
+            when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
                 @Override
                 public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
-                    long param = (Long) invocationOnMock.getArguments()[0];
-                    if (param == 2L) {
-                        throw mockException;
+                    if (randomInt(20) == 0) { //5% throw exceptions
+                        int type = randomInt(4);
+                        if (type == 0)
+                            throw mock(InstagramRateLimitException.class);
+                        else if (type == 1)
+                            throw mock(InstagramBadRequestException.class);
+                        else if (type == 2)
+                            throw mock(InstagramException.class);
+                        else
+                            throw new Exception();
                     } else {
                         return createRandomMockMediaFeed();
                     }
@@ -150,11 +138,27 @@ public class InstagramRecentMediaCollectorTest {
         return instagramClient;
     }
 
+    private Set<UserId> creatUsers(int numUsers) {
+        Set<UserId> users = Sets.newHashSet();
+        for(int i=0; i < numUsers; ++i) {
+            UserId user = new UserId();
+            user.setUserId(Integer.toString(randomInt()));
+            if(randomInt(2) == 0) {
+                user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
+            }
+            if(randomInt(2) == 0) {
+                user.setBeforeDate(DateTime.now());
+            }
+            users.add(user);
+        }
+        return users;
+    }
+
     private MediaFeed createRandomMockMediaFeed() throws InstagramException {
         MediaFeed feed = mock(MediaFeed.class);
-        when(feed.getData()).thenReturn(createData(this.rand.nextInt(100)));
+        when(feed.getData()).thenReturn(createData(randomInt(100)));
         Pagination pagination = mock(Pagination.class);
-        if(this.rand.nextInt(2) == 0) {
+        if(randomInt(2) == 0) {
             when(pagination.hasNextPage()).thenReturn(true);
         } else {
             when(pagination.hasNextPage()).thenReturn(false);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/9f296be7/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
new file mode 100644
index 0000000..e438f54
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/ConcurentYieldTillSuccessQueue.java
@@ -0,0 +1,27 @@
+package org.apache.streams.util;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * A {@link java.util.concurrent.ConcurrentLinkedQueue} implementation that causes thread yields when data is not
+ * successfully offered or polled.
+ */
+public class ConcurentYieldTillSuccessQueue<T> extends ConcurrentLinkedQueue<T> {
+
+    @Override
+    public T poll() {
+        T item = null;
+        while(!super.isEmpty() && (item = super.poll()) == null) {
+            Thread.yield();;
+        }
+        return item;
+    }
+
+    @Override
+    public boolean offer(T t) {
+        while(!super.offer(t)) {
+            Thread.yield();
+        }
+        return true;
+    }
+}


[2/8] git commit: Backoff and token Utils classes implemented and tested

Posted by mf...@apache.org.
Backoff and token Utils classes implemented and tested


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ff5f8211
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ff5f8211
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ff5f8211

Branch: refs/heads/master
Commit: ff5f821121e98dccc5f72a313583e533f3a5bf8d
Parents: b22efee
Author: Ryan Ebanks <re...@Informations-MacBook-Pro.local>
Authored: Fri Aug 8 17:15:49 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro.local>
Committed: Fri Aug 8 17:15:49 2014 -0500

----------------------------------------------------------------------
 .../com/instagram/InstagramConfiguration.json   |  53 +++++-
 streams-util/pom.xml                            |   6 +-
 .../api/requests/backoff/BackOffException.java  |  48 +++++
 .../api/requests/backoff/BackOffStrategy.java   |  87 +++++++++
 .../impl/ConstantTimeBackOffStrategy.java       |  33 ++++
 .../impl/ExponentialBackOffStrategy.java        |  32 ++++
 .../backoff/impl/LinearTimeBackOffStrategy.java |  25 +++
 .../streams/util/oauth/tokens/OauthToken.java   |  19 ++
 .../tokens/tokenmanager/SimpleTokenManager.java |  39 ++++
 .../tokenmanager/impl/BasicTokenManger.java     |  72 ++++++++
 .../requests/backoff/BackOffStrategyTest.java   |  70 ++++++++
 .../ConstantTimeBackOffStrategyTest.java        |  26 +++
 .../backoff/ExponentialBackOffStrategyTest.java |  23 +++
 .../backoff/LinearTimeBackOffStartegyTest.java  |  22 +++
 .../tokenmanager/TestBasicTokenManager.java     | 176 +++++++++++++++++++
 15 files changed, 722 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index f8f8117..7b894a2 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -9,13 +9,50 @@
             "type": "string",
             "description": "Your Instagram Client Id"
         },
-        "clientSecret": {
-            "type": "string",
-            "description": "Your Instagram Client secret"
-        },
-        "callbackUrl": {
-            "type": "string",
-            "description": "Your Instagream callback url"
+        "usersInfo": {
+            "type": "object",
+            "properties": {
+                "userIds": {
+                    "type": "array",
+                    "uniqueItems": true,
+                    "items": {
+                        "type": "object",
+                        "$ref": "#/definitions/userInfo"
+                    },
+                    "description": "List of user ids to gather data for. Type of data gathered depends on provider"
+                },
+                "defaultAfterDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for all users that don't have date ranges specified. If this is null it will pull from the earliest possible time"
+                },
+                "defaultBeforeDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for all users that don't have date ranges specified. If this is null it will pull till current time."
+                }
+            }
+        }
+   },
+    "definitions": {
+        "userInfo": {
+            "type": "object",
+            "properties": {
+                "userId": {
+                    "type": "string",
+                    "description": "instagram user id"
+                },
+                "afterDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this date will be used as the start of the range for the request for this user. If this is null it will use the defaultBeforeDate."
+                },
+                "beforeDate": {
+                    "type": "string",
+                    "format": "datetime",
+                    "description": "If the api allows to gather data by date range, this date will be used as the end of the range for the request for this user.. If this is null it will use the defaultAfterDate."
+                }
+            }
         }
-   }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/pom.xml
----------------------------------------------------------------------
diff --git a/streams-util/pom.xml b/streams-util/pom.xml
index 0a48ec9..7a50201 100644
--- a/streams-util/pom.xml
+++ b/streams-util/pom.xml
@@ -43,6 +43,10 @@
             <groupId>joda-time</groupId>
             <artifactId>joda-time</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.1.2</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
new file mode 100644
index 0000000..0bdd82d
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffException.java
@@ -0,0 +1,48 @@
+package org.apache.streams.util.api.requests.backoff;
+
+/**
+ * Exception that is thrown when a {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} has attempted to
+ * <code>backOff()</code> more than the {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} was configured for.
+ */
+public class BackOffException extends Exception {
+
+    private int attemptCount;
+    private long sleepTime;
+
+    public BackOffException() {
+        this(-1, -1);
+    }
+
+    public BackOffException(String message) {
+        this(message, -1, -1);
+    }
+
+    public BackOffException(int attemptCount, long maxSleepTime) {
+        this.attemptCount = attemptCount;
+        this.sleepTime = maxSleepTime;
+    }
+
+    public BackOffException(String message, int attemptCount, long maxSleepTime) {
+        super(message);
+        this.attemptCount = attemptCount;
+        this.sleepTime = maxSleepTime;
+    }
+
+    /**
+     * Gets the number of back off attempts that happened before the exception was thrown. If the function that
+     * initialized this exception does not set the number of attempts, -1 will be returned.
+     * @return number of attempts
+     */
+    public int getNumberOfBackOffsAttempted() {
+        return this.attemptCount;
+    }
+
+    /**
+     * Gets the longest sleep period that the strategy attempted. If the function that
+     * initialized this exception does not set the longest sleep period, -1 will be returned.
+     * @return
+     */
+    public long getLongestBackOff() {
+        return this.sleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
new file mode 100644
index 0000000..628d37b
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/BackOffStrategy.java
@@ -0,0 +1,87 @@
+package org.apache.streams.util.api.requests.backoff;
+
+/**
+ * BackOffStrategy will cause the current thread to sleep for a specific amount of time. This is used to adhere to
+ * api rate limits.
+ *
+ * The example below illustrates using a BackOffStrategy to slow down requests when you hit a rate limit exception.
+ *
+ * <code>
+ *     public void pollApi(ApiClient apiClient, BackOffStrategy backOffStrategy) throws BackOffException {
+ *          while( apiClient.hasMoreData() ) {
+ *              try {
+ *                  apiClient.pollData();
+ *              } catch (RateLimitException rle) {
+ *                  backOffStrategy.backOff();
+ *              }
+ *          }
+ *     }
+ * </code>
+ *
+ */
+public abstract class BackOffStrategy {
+
+    private long baseSleepTime;
+    private long lastSleepTime;
+    private int maxAttempts;
+    private int attemptsCount;
+
+    /**
+     * A BackOffStrategy that can effectively be used endlessly.
+     * @param baseBackOffTime amount of time back of in seconds
+     */
+    public BackOffStrategy(long baseBackOffTime) {
+        this(baseBackOffTime, -1);
+    }
+
+    /**
+     * A BackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+     * @param baseBackOffTime time to back off in milliseconds, must be greater than 0.
+     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
+     */
+    public BackOffStrategy(long baseBackOffTime, int maximumNumberOfBackOffAttempts) {
+        if(baseBackOffTime <= 0) {
+            throw new IllegalArgumentException("backOffTimeInMilliSeconds is not greater than 0 : "+baseBackOffTime);
+        }
+        if(maximumNumberOfBackOffAttempts<=0 && maximumNumberOfBackOffAttempts != -1) {
+            throw new IllegalArgumentException("maximumNumberOfBackOffAttempts is not greater than 0 : "+maximumNumberOfBackOffAttempts);
+        }
+        this.baseSleepTime = baseBackOffTime;
+        this.maxAttempts = maximumNumberOfBackOffAttempts;
+        this.attemptsCount = 0;
+    }
+
+    /**
+     * Cause the current thread to sleep for an amount of time based on the implemented strategy. If limits are set
+     * on the number of times the backOff can be called, an exception will be thrown.
+     * @throws BackOffException
+     */
+    public void backOff() throws BackOffException {
+        if(this.attemptsCount++ >= this.maxAttempts && this.maxAttempts != -1) {
+            throw new BackOffException(this.attemptsCount-1, this.lastSleepTime);
+        } else {
+            try {
+                Thread.sleep(this.lastSleepTime = calculateBackOffTime(this.attemptsCount, this.baseSleepTime));
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Rests the back off strategy to its original state.  After the call the strategy will act as if {@link BackOffStrategy#backOff()}
+     * has never been called.
+     */
+    public void reset() {
+        this.attemptsCount = 0;
+    }
+
+    /**
+     * Calculate the amount of time in milliseconds that the strategy should back off for
+     * @param attemptCount the number of attempts the strategy has backed off. i.e. 1 -> this is the first attempt, 2 -> this is the second attempt, etc.
+     * @param baseSleepTime the minimum amount of time it should back off for in milliseconds
+     * @return the amount of time it should back off in milliseconds
+     */
+    protected abstract long calculateBackOffTime(int attemptCount, long baseSleepTime);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
new file mode 100644
index 0000000..bfc523a
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ConstantTimeBackOffStrategy.java
@@ -0,0 +1,33 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes the current thread to sleep the
+ * same amount of time each time <code>backOff()</code> is called.
+ *
+ */
+public class ConstantTimeBackOffStrategy extends BackOffStrategy {
+
+    /**
+     * A ConstantTimeBackOffStrategy that can effectively be used endlessly.
+     * @param baseBackOffTimeInMiliseconds amount of time back of in milliseconds
+     */
+    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds) {
+        this(baseBackOffTimeInMiliseconds, -1);
+    }
+
+    /**
+     * A ConstantTimeBackOffStrategy that has a limited number of uses before it throws a {@link org.apache.streams.util.api.requests.backoff.BackOffException}
+     * @param baseBackOffTimeInMiliseconds time to back off in milliseconds, must be greater than 0.
+     * @param maximumNumberOfBackOffAttempts maximum number of attempts, must be grater than 0 or -1. -1 indicates there is no maximum number of attempts.
+     */
+    public ConstantTimeBackOffStrategy(long baseBackOffTimeInMiliseconds, int maximumNumberOfBackOffAttempts) {
+        super(baseBackOffTimeInMiliseconds, maximumNumberOfBackOffAttempts);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return baseSleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
new file mode 100644
index 0000000..af59a6a
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/ExponentialBackOffStrategy.java
@@ -0,0 +1,32 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * Exponential backk strategy.  Caluclated by baseBackOffTimeInSeconds raised the attempt-count power.
+ */
+public class ExponentialBackOffStrategy extends BackOffStrategy {
+
+
+    /**
+     * Unlimited use ExponentialBackOffStrategy
+     * @param baseBackOffTimeInSeconds
+     */
+    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds) {
+        this(baseBackOffTimeInSeconds, -1);
+    }
+
+    /**
+     * Limited use ExponentialBackOffStrategy
+     * @param baseBackOffTimeInSeconds
+     * @param maxNumAttempts
+     */
+    public ExponentialBackOffStrategy(int baseBackOffTimeInSeconds, int maxNumAttempts) {
+        super(baseBackOffTimeInSeconds, maxNumAttempts);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return Math.round(Math.pow(baseSleepTime, attemptCount)) * 1000;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
new file mode 100644
index 0000000..55f62a2
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/api/requests/backoff/impl/LinearTimeBackOffStrategy.java
@@ -0,0 +1,25 @@
+package org.apache.streams.util.api.requests.backoff.impl;
+
+import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
+
+/**
+ * A {@link org.apache.streams.util.api.requests.backoff.BackOffStrategy} that causes back offs in linear increments. Each
+ * attempt cause an increase back off period.
+ * Calculated by attemptNumber * baseBackOffAmount.
+ */
+public class LinearTimeBackOffStrategy extends BackOffStrategy{
+
+
+    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds) {
+        this(baseBackOffTimeInSeconds, -1);
+    }
+
+    public LinearTimeBackOffStrategy(int baseBackOffTimeInSeconds, int maxAttempts) {
+        super(baseBackOffTimeInSeconds, -1);
+    }
+
+    @Override
+    protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+        return 1000L * attemptCount * baseSleepTime;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
new file mode 100644
index 0000000..df264c5
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
@@ -0,0 +1,19 @@
+package org.apache.streams.util.oauth.tokens;
+
+/**
+ *
+ */
+public abstract class OauthToken {
+
+    /**
+     * Must create equals method for all OauthTokens.
+     * @param o
+     * @return true if equal, and false otherwise
+     */
+    protected abstract boolean internalEquals(Object o);
+
+    @Override
+    public boolean equals(Object o) {
+        return this.internalEquals(o);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
new file mode 100644
index 0000000..d052da1
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -0,0 +1,39 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+
+import java.util.Collection;
+
+/**
+ * Manges access to oauth tokens.  Allows a caller to add tokens to the token pool and receive an available token.
+ */
+public interface SimpleTokenManager<T extends OauthToken> {
+
+
+    /**
+     * Adds a token to the available token pool.
+     * @param token Token to be added
+     * @return true, if token was successfully added to the pool and false otherwise.
+     */
+    public boolean addTokenToPool(T token);
+
+    /**
+     * Adds a {@link java.util.Collection} of tokens to the available token pool.
+     * @param tokens Tokens to be added
+     * @return true, if the token pool size increased after adding the tokens, and false otherwise.
+     */
+    public boolean addAllTokensToPool(Collection<T> tokens);
+
+    /**
+     * Get an available token. If no tokens are available it returns null.
+     * @return next available token
+     */
+    public T getNextAvailableToken();
+
+    /**
+     * Get the number of available tokens
+     * @return number of available tokens
+     */
+    public int numAvailableTokens();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
new file mode 100644
index 0000000..20c8d20
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -0,0 +1,72 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Manages a pool of tokens the most basic possible way.  If all tokens are added to the manager before {@link BasicTokenManger#getNextAvailableToken() getNextAvailableToken}
+ * is called tokens are issued in the order they were added to the manager, FIFO.  The BasicTokenManager acts as a circular queue
+ * of tokens.  Once the manager issues all available tokens it will cycle back to the first token and start issuing tokens again.
+ *
+ * When adding tokens to the pool of available tokens, the manager will not add tokens that are already in the pool.
+ *
+ * The manager class is thread safe.
+ */
+public class BasicTokenManger<T extends OauthToken> implements SimpleTokenManager<T>{
+
+    private ArrayList<T> availableTokens;
+    private int nextToken;
+
+    public BasicTokenManger() {
+        this(null);
+    }
+
+    public BasicTokenManger(Collection<T> tokens) {
+        if(tokens != null) {
+            this.availableTokens = new ArrayList<T>(tokens.size());
+            this.addAllTokensToPool(tokens);
+        } else {
+            this.availableTokens = new ArrayList<T>();
+        }
+        this.nextToken = 0;
+    }
+
+    @Override
+    public synchronized boolean addTokenToPool(T token) {
+        if(token == null || this.availableTokens.contains(token))
+            return false;
+        else
+            return this.availableTokens.add(token);
+    }
+
+    @Override
+    public synchronized boolean addAllTokensToPool(Collection<T> tokens) {
+        int startSize = this.availableTokens.size();
+        for(T token : tokens) {
+            this.addTokenToPool(token);
+        }
+        return startSize < this.availableTokens.size();
+    }
+
+    @Override
+    public synchronized T getNextAvailableToken() {
+        T token = null;
+        if(this.availableTokens.size() == 0) {
+            return token;
+        } else {
+            token = this.availableTokens.get(nextToken++);
+            if(nextToken == this.availableTokens.size()) {
+                nextToken = 0;
+            }
+            return token;
+        }
+    }
+
+    @Override
+    public synchronized int numAvailableTokens() {
+        return this.availableTokens.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
new file mode 100644
index 0000000..5f3453e
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/BackOffStrategyTest.java
@@ -0,0 +1,70 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit Tets
+ */
+public class BackOffStrategyTest {
+
+
+    private class TestBackOff extends BackOffStrategy {
+
+        public TestBackOff(long sleep, int maxAttempts) {
+            super(sleep, maxAttempts);
+        }
+
+        @Override
+        protected long calculateBackOffTime(int attemptCount, long baseSleepTime) {
+            return baseSleepTime;
+        }
+    }
+
+    @Test
+    public void testUnlimitedBackOff() {
+        BackOffStrategy backOff = new TestBackOff(1, -1);
+        try {
+            for(int i=0; i < 100; ++i) {
+                backOff.backOff();
+            }
+        } catch (BackOffException boe) {
+            fail("Threw BackOffException.  Not expected action");
+        }
+    }
+
+    @Test
+    public void testLimitedUseBackOff()  {
+        BackOffStrategy backOff = new TestBackOff(1, 2);
+        try {
+            backOff.backOff();
+        } catch (BackOffException boe) {
+            fail("Threw BackOffExpection. Not expected action");
+        }
+        try {
+            backOff.backOff();
+        } catch (BackOffException boe) {
+            fail("Threw BackOffExpection. Not expected action");
+        }
+        try {
+            backOff.backOff();
+            fail("Expected BackOffException to be thrown.");
+        } catch (BackOffException boe) {
+
+        }
+    }
+
+    @Test
+    public void testBackOffSleep() throws BackOffException {
+        BackOffStrategy backOff = new TestBackOff(2000, 1);
+        long startTime = System.currentTimeMillis();
+        backOff.backOff();
+        long endTime = System.currentTimeMillis();
+        assertTrue(endTime - startTime >= 2000);
+    }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
new file mode 100644
index 0000000..c9e7de9
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ConstantTimeBackOffStrategyTest.java
@@ -0,0 +1,26 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import org.apache.streams.util.api.requests.backoff.impl.ConstantTimeBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class ConstantTimeBackOffStrategyTest extends RandomizedTest{
+
+
+    @Test
+    public void constantTimeBackOffStategy() {
+        BackOffStrategy backOff = new ConstantTimeBackOffStrategy(1);
+        assertEquals(1, backOff.calculateBackOffTime(1,1));
+        assertEquals(1, backOff.calculateBackOffTime(2,1));
+        assertEquals(1, backOff.calculateBackOffTime(3,1));
+        assertEquals(1, backOff.calculateBackOffTime(4,1));
+        assertEquals(1, backOff.calculateBackOffTime(randomIntBetween(1, Integer.MAX_VALUE),1));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
new file mode 100644
index 0000000..43b42f7
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/ExponentialBackOffStrategyTest.java
@@ -0,0 +1,23 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class ExponentialBackOffStrategyTest {
+
+    @Test
+    public void exponentialTimeBackOffStrategyTest() {
+        BackOffStrategy backOff = new ExponentialBackOffStrategy(1);
+        assertEquals(5000, backOff.calculateBackOffTime(1,5));
+        assertEquals(25000, backOff.calculateBackOffTime(2,5));
+        assertEquals(125000, backOff.calculateBackOffTime(3,5));
+        assertEquals(2000, backOff.calculateBackOffTime(1,2));
+        assertEquals(16000, backOff.calculateBackOffTime(4,2));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
new file mode 100644
index 0000000..7a0f848
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/api/requests/backoff/LinearTimeBackOffStartegyTest.java
@@ -0,0 +1,22 @@
+package org.apache.streams.util.api.requests.backoff;
+
+import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit Tests
+ */
+public class LinearTimeBackOffStartegyTest {
+
+    @Test
+    public void linearTimeBackOffStrategyTest() {
+        BackOffStrategy backOff = new LinearTimeBackOffStrategy(1);
+        assertEquals(1000, backOff.calculateBackOffTime(1,1));
+        assertEquals(2000, backOff.calculateBackOffTime(2,1));
+        assertEquals(3000, backOff.calculateBackOffTime(3,1));
+        assertEquals(4000, backOff.calculateBackOffTime(4,1));
+        assertEquals(25000, backOff.calculateBackOffTime(5,5));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ff5f8211/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
new file mode 100644
index 0000000..903cc69
--- /dev/null
+++ b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -0,0 +1,176 @@
+package org.apache.streams.util.oauth.tokens.tokenmanager;
+
+
+import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.*;
+
+/**
+ * Unit tests for BasticTokenManager
+ */
+public class TestBasicTokenManager {
+
+    /**
+     * Simple token for testing purposes
+     */
+    private class TestToken extends OauthToken {
+
+        private String s;
+
+        public TestToken(String s) {
+            this.s = s;
+        }
+
+        @Override
+        protected boolean internalEquals(Object o) {
+            if(!(o instanceof TestToken))
+                return false;
+            TestToken that = (TestToken) o;
+            return this.s.equals(that.s);
+        }
+    }
+
+    @Test
+    public void testNoArgConstructor() {
+        try {
+            BasicTokenManger manager = new BasicTokenManger<TestToken>();
+            assertEquals(0, manager.numAvailableTokens());
+        } catch (Throwable t) {
+            fail("Constructors threw error: "+t.getMessage());
+        }
+    }
+
+    @Test
+    public void testCollectionConstructor() {
+        List<TestToken> tokens = new LinkedList<TestToken>();
+        try {
+            BasicTokenManger manager1 = new BasicTokenManger<TestToken>(tokens);
+            tokens.add(new TestToken("a"));
+            tokens.add(new TestToken("b"));
+            assertEquals(0, manager1.numAvailableTokens());
+            BasicTokenManger manager2 = new BasicTokenManger<TestToken>(tokens);
+            assertEquals(2, manager2.numAvailableTokens());
+            assertEquals(0, manager1.numAvailableTokens());
+        } catch (Throwable t) {
+            fail("Constructors threw error: "+t.getMessage());
+        }
+    }
+
+    @Test
+    public void testAddTokenToPool() {
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        assertTrue(manager.addTokenToPool(new TestToken("a")));
+        assertEquals(1, manager.numAvailableTokens());
+        assertFalse(manager.addTokenToPool(new TestToken("a")));
+        assertEquals(1, manager.numAvailableTokens());
+        assertTrue(manager.addTokenToPool(new TestToken("b")));
+        assertEquals(2, manager.numAvailableTokens());
+    }
+
+    @Test
+    public void testAddAllTokensToPool() {
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        List<TestToken> tokens = new ArrayList<TestToken>();
+        tokens.add(new TestToken("a"));
+        tokens.add(new TestToken("b"));
+        tokens.add(new TestToken("c"));
+        assertTrue(manager.addAllTokensToPool(tokens));
+        assertEquals(3, manager.numAvailableTokens());
+        assertFalse(manager.addAllTokensToPool(tokens));
+        assertEquals(3, manager.numAvailableTokens());
+        tokens.add(new TestToken("d"));
+        assertTrue(manager.addAllTokensToPool(tokens));
+        assertEquals(4, manager.numAvailableTokens());
+    }
+
+    @Test
+    public void testGetNextAvailableToken() {
+        BasicTokenManger manager = new BasicTokenManger<TestToken>();
+        assertNull(manager.getNextAvailableToken());
+        TestToken tokenA = new TestToken("a");
+        assertTrue(manager.addTokenToPool(tokenA));
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+
+        TestToken tokenB = new TestToken("b");
+        TestToken tokenC = new TestToken("c");
+        assertTrue(manager.addTokenToPool(tokenB));
+        assertTrue(manager.addTokenToPool(tokenC));
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenB, manager.getNextAvailableToken());
+        assertEquals(tokenC, manager.getNextAvailableToken());
+        assertEquals(tokenA, manager.getNextAvailableToken());
+        assertEquals(tokenB, manager.getNextAvailableToken());
+        assertEquals(tokenC, manager.getNextAvailableToken());
+    }
+
+    @Test
+    public void testMultiThreadSafety() {
+        int numThreads = 10;
+        ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+        CountDownLatch startLatch = new CountDownLatch(1);
+        CountDownLatch finishLatch = new CountDownLatch(numThreads);
+        BasicTokenManger<TestToken> manager = new BasicTokenManger<TestToken>();
+        for(int i=0; i < numThreads; ++i) {
+            assertTrue(manager.addTokenToPool(new TestToken(String.valueOf(i))));
+        }
+        for(int i=0; i < numThreads; ++i) {
+            executor.submit(new TestThread(manager, startLatch, finishLatch, numThreads));
+        }
+        try {
+            Thread.sleep(2000); //sleep for 2 seconds so other threads can initialize
+            startLatch.countDown();
+            finishLatch.await();
+            assertTrue("No errors were thrown during thead safe check", true);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+            fail("Error occured durring thread safe test : "+t.getMessage());
+        }
+    }
+
+    /**
+     * Test class for thread safe check.
+     */
+    private class TestThread implements Runnable {
+
+        private BasicTokenManger<TestToken> manager;
+        private CountDownLatch startLatch;
+        private CountDownLatch finishedLatch;
+        private int availableTokens;
+
+        public TestThread(BasicTokenManger<TestToken> manager, CountDownLatch startLatch, CountDownLatch finishedLatch, int availableTokens) {
+            this.manager = manager;
+            this.startLatch = startLatch;
+            this.finishedLatch = finishedLatch;
+            this.availableTokens = availableTokens;
+        }
+
+        @Override
+        public void run() {
+            try {
+                this.startLatch.await();
+                for(int i=0; i < 1000; ++i) {
+                    assertNotNull(this.manager.getNextAvailableToken());
+                    assertEquals(this.availableTokens, this.manager.numAvailableTokens());
+                }
+                this.finishedLatch.countDown();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            } catch (Throwable t) {
+                fail("Threw error in multithread test : "+t.getMessage());
+            }
+        }
+    }
+
+}