You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/07/15 18:48:02 UTC

[1/6] git commit: STREAMS 121 | Added instagram provider and tests

Repository: incubator-streams
Updated Branches:
  refs/heads/instagram 14f7050ba -> 2f0a20f30


STREAMS 121 | Added instagram provider and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/77603934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/77603934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/77603934

Branch: refs/heads/instagram
Commit: 77603934e41ac78ba68f73b3f80ea6852f0acb46
Parents: 815ce2a
Author: rebanks <re...@w2odigital.com>
Authored: Fri Jul 11 16:09:59 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Fri Jul 11 16:09:59 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../streams-provider-instagram/pom.xml          |   6 +
 .../instagram/InstagramConfigurator.java        |  15 +-
 .../provider/InstagramRecentMediaCollector.java | 120 ++++++
 .../provider/InstagramRecentMediaProvider.java  |  97 +++++
 .../provider/InstagramTimelineProvider.java     | 409 -------------------
 .../com/instagram/InstagramConfiguration.json   |  13 +-
 .../InstagramUserInformationConfiguration.json  |   2 +-
 .../InstagramRecentMediaCollectorTest.java      | 161 ++++++++
 .../InstagramRecentMediaProviderTest.java       | 145 +++++++
 .../test/InstagramActivitySerDeTest.java        |  92 -----
 11 files changed, 544 insertions(+), 517 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index 620f68e..b06c276 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -58,6 +58,7 @@
         <module>streams-provider-sysomos</module>
         <module>streams-provider-rss</module>
         <module>streams-processor-regex</module>
+        <module>streams-provider-instagram</module>
     </modules>
 
     <dependencyManagement>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml
index e356a7a..a96ff1c 100644
--- a/streams-contrib/streams-provider-instagram/pom.xml
+++ b/streams-contrib/streams-provider-instagram/pom.xml
@@ -76,6 +76,12 @@
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
index f771856..4d01605 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -25,9 +25,6 @@ import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.validation.Validation;
-import javax.validation.Validator;
-import javax.validation.ValidatorFactory;
 import java.io.IOException;
 
 /**
@@ -41,8 +38,8 @@ public class InstagramConfigurator {
 
     public static InstagramConfiguration detectInstagramConfiguration(Config config) {
 
-        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-        Validator validator = factory.getValidator();
+//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+//        Validator validator = factory.getValidator();
 
         InstagramConfiguration instagramConfiguration = null;
         try {
@@ -52,15 +49,15 @@ public class InstagramConfigurator {
         }
         Preconditions.checkNotNull(instagramConfiguration);
 
-        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+//        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
 
         return instagramConfiguration;
     }
 
     public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) {
 
-        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-        Validator validator = factory.getValidator();
+//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+//        Validator validator = factory.getValidator();
 
         InstagramUserInformationConfiguration instagramConfiguration = null;
         try {
@@ -70,7 +67,7 @@ public class InstagramConfigurator {
         }
         Preconditions.checkNotNull(instagramConfiguration);
 
-        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+//        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
 
         return instagramConfiguration;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
new file mode 100644
index 0000000..7eb3fcd
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -0,0 +1,120 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+import java.util.Set;
+
+/**
+ *
+ */
+public class InstagramRecentMediaCollector implements Runnable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollector.class);
+    protected static final int MAX_ATTEMPTS = 5;
+    protected static final int SLEEP_SECS = 5; //5 seconds
+
+    protected Queue dataQueue; //exposed for testing
+    private InstagramUserInformationConfiguration config;
+    private Instagram instagramClient;
+    private volatile boolean isCompleted;
+
+
+    public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration config) {
+        this.dataQueue = queue;
+        this.config = config;
+        this.instagramClient = new Instagram(this.config.getClientId());
+        this.isCompleted = false;
+    }
+
+    protected void setInstagramClient(Instagram instagramClient) {
+        this.instagramClient = instagramClient;
+    }
+
+    protected Set<Long> getUserIds() {
+        Set<Long> userIds = Sets.newHashSet();
+        for(String id : config.getUserIds()) {
+            try {
+                userIds.add(Long.parseLong(id));
+            } catch (NumberFormatException nfe) {
+                LOGGER.error("Failed to parse user id, {}, to a long : {}", id, nfe.getMessage());
+            }
+        }
+        return userIds;
+    }
+
+    protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException {
+        LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
+        if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
+            long sleepTime = Math.round(Math.pow(SLEEP_SECS, attempt)) * 1000;
+            try {
+                LOGGER.debug("Encountered rate limit exception, sleeping for {} ms", sleepTime);
+                Thread.sleep(sleepTime);
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            }
+        } else {
+            LOGGER.error("Instagram returned an excetpion to the user media request : {}", instaExec.getMessage());
+            throw instaExec;
+        }
+    }
+
+    private void getUserMedia(Long userId) {
+        MediaFeed feed = null;
+        int attempts = 0;
+        int count = 0;
+        do {
+            ++attempts;
+            try {
+                feed = this.instagramClient.getRecentMediaFeed(userId);
+                queueData(feed, userId);
+                count += feed.getData().size();
+                while(feed != null && feed.getPagination() != null && feed.getPagination().hasNextPage()) {
+                    feed = this.instagramClient.getRecentMediaNextPage(feed.getPagination());
+                    queueData(feed, userId);
+                    count += feed.getData().size();
+                }
+            } catch (InstagramException ie) {
+                try {
+                    handleInstagramException(ie, attempts);
+                } catch (InstagramException ie2) { //not a rate limit exception, ignore user
+                    attempts = MAX_ATTEMPTS;
+                }
+            }
+        } while(feed == null && attempts < MAX_ATTEMPTS);
+        LOGGER.debug("For user, {}, received {} MediaFeedData", userId, count);
+    }
+
+    private void queueData(MediaFeed userFeed, Long userId) {
+        if(userFeed == null) {
+            LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
+        } else {
+            for(MediaFeedData data : userFeed.getData()) {
+                synchronized (this.dataQueue) {
+                    while(!this.dataQueue.offer(data)) {
+                        Thread.yield();
+                    }
+                }
+            }
+        }
+    }
+
+    public boolean isCompleted() {
+        return this.isCompleted;
+    }
+
+    @Override
+    public void run() {
+        for(Long userId : getUserIds()) {
+            getUserMedia(userId);
+        }
+        this.isCompleted = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
new file mode 100644
index 0000000..3354e54
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -0,0 +1,97 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Queues;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+
+import java.math.BigInteger;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Created by rebanks on 7/9/14.
+ */
+public class InstagramRecentMediaProvider implements StreamsProvider {
+
+    private InstagramUserInformationConfiguration config;
+    private InstagramRecentMediaCollector dataCollector;
+    protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
+    private ExecutorService executorService;
+    private volatile boolean isCompleted;
+
+    public InstagramRecentMediaProvider() {
+        this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+    }
+
+    public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) {
+        this.config = config;
+        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+        this.isCompleted = false;
+    }
+
+    @Override
+    public void startStream() {
+        this.dataCollector = getInstagramRecentMediaCollector();
+        this.executorService = Executors.newSingleThreadExecutor();
+        this.executorService.submit(this.dataCollector);
+    }
+
+    protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+        return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
+    }
+
+
+    @Override
+    public StreamsResultSet readCurrent() {
+        Queue<StreamsDatum> batch = Queues.newConcurrentLinkedQueue();
+        MediaFeedData data = null;
+        synchronized (this.mediaFeedQueue) {
+            while(!this.mediaFeedQueue.isEmpty()) {
+                data = this.mediaFeedQueue.poll();
+                batch.add(new StreamsDatum(data, data.getId()));
+            }
+        }
+        this.isCompleted = batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted();
+        return new StreamsResultSet(batch);
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !this.isCompleted;
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+
+    }
+
+    @Override
+    public void cleanUp() {
+        this.executorService.shutdown();
+        try {
+            this.executorService.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+        } finally {
+            this.executorService = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
deleted file mode 100644
index d3e7179..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
+++ /dev/null
@@ -1,409 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.provider;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.instagram.InstagramUserInformationConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class InstagramTimelineProvider implements StreamsProvider, Serializable {
-
-    public final static String STREAMS_ID = "InstagramTimelineProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
-    public static final int MAX_NUMBER_WAITING = 10000;
-
-    private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
-    private InstagramUserInformationConfiguration config;
-
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    public InstagramUserInformationConfiguration getConfig() {
-        return config;
-    }
-
-    public void setConfig(InstagramUserInformationConfiguration config) {
-        this.config = config;
-    }
-
-    protected Iterator<Long[]> idsBatches;
-    protected Iterator<String[]> screenNameBatches;
-
-    protected volatile Queue<StreamsDatum> providerQueue;
-
-    protected int idsCount;
-    protected Instagram client;
-
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    private static ExecutorService getExecutor() {
-        return Executors.newSingleThreadExecutor();
-    }
-
-    public InstagramTimelineProvider() {
-        Config config = StreamsConfigurator.config.getConfig("instagram");
-        this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
-    }
-
-    public InstagramTimelineProvider(InstagramUserInformationConfiguration config) {
-        this.config = config;
-    }
-
-    public InstagramTimelineProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("instagram");
-        this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
-        this.klass = klass;
-    }
-
-    public InstagramTimelineProvider(InstagramUserInformationConfiguration config, Class klass) {
-        this.config = config;
-        this.klass = klass;
-    }
-
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
-
-    @Override
-    public void startStream() {
-        LOGGER.debug("{} startStream", STREAMS_ID);
-
-        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        while(idsBatches.hasNext())
-            loadBatch(idsBatches.next());
-
-        while(screenNameBatches.hasNext())
-            loadBatch(screenNameBatches.next());
-
-        executor.shutdown();
-    }
-
-    private void loadBatch(Long[] ids) {
-
-        // twitter4j implementation below - replace with jInstagram
-
-//        Twitter client = getTwitterClient();
-//        int keepTrying = 0;
-//
-//        // keep trying to load, give it 5 attempts.
-//        //while (keepTrying < 10)
-//        while (keepTrying < 1)
-//        {
-//            try
-//            {
-//                long[] toQuery = new long[ids.length];
-//                for(int i = 0; i < ids.length; i++)
-//                    toQuery[i] = ids[i];
-//
-//                for (User tStat : client.lookupUsers(toQuery)) {
-//
-//                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-//                    executor.submit(providerTask);
-//
-//                }
-//                keepTrying = 10;
-//            }
-//            catch(TwitterException twitterException) {
-//                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-//            }
-//            catch(Exception e) {
-//                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-//            }
-//        }
-    }
-
-    private void loadBatch(String[] ids) {
-
-        // twitter4j implementation below - replace with jInstagram
-//
-//        Twitter client = getTwitterClient();
-//        int keepTrying = 0;
-//
-//        // keep trying to load, give it 5 attempts.
-//        //while (keepTrying < 10)
-//        while (keepTrying < 1)
-//        {
-//            try
-//            {
-//                for (User tStat : client.lookupUsers(ids)) {
-//
-//                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
-//                    executor.submit(providerTask);
-//
-//                }
-//                keepTrying = 10;
-//            }
-//            catch(TwitterException twitterException) {
-//                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-//            }
-//            catch(Exception e) {
-//                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-//            }
-//        }
-    }
-
-    public class InstagramTimelineProviderTask implements Runnable {
-
-        // twitter4j implementation below - replace with jInstagram
-
-        private final Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
-
-        private InstagramTimelineProvider provider;
-        private Instagram client;
-        private Long id;
-
-        public InstagramTimelineProviderTask(InstagramTimelineProvider provider, Instagram client, Long id) {
-            this.provider = provider;
-            this.client = client;
-            this.id = id;
-        }
-
-        @Override
-        public void run() {
-
-            // twitter4j implementation below - replace with jInstagram
-
-//            Paging paging = new Paging(1, 200);
-//            List<Status> statuses = null;
-//            boolean KeepGoing = true;
-//            boolean hadFailure = false;
-//
-//            do
-//            {
-//                int keepTrying = 0;
-//
-//                // keep trying to load, give it 5 attempts.
-//                //This value was chosen because it seemed like a reasonable number of times
-//                //to retry capturing a timeline given the sorts of errors that could potentially
-//                //occur (network timeout/interruption, faulty client, etc.)
-//                while (keepTrying < 5)
-//                {
-//
-//                    try
-//                    {
-//                        statuses = client.getUserTimeline(id, paging);
-//
-//                        for (Status tStat : statuses)
-//                        {
-//                            String json = TwitterObjectFactory.getRawJSON(tStat);
-//
-//                            try {
-//                                provider.lock.readLock().lock();
-//                                ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
-//                            } finally {
-//                                provider.lock.readLock().unlock();
-//                            }
-//                        }
-//
-//                        paging.setPage(paging.getPage() + 1);
-//
-//                        keepTrying = 10;
-//                    }
-//                    catch(TwitterException twitterException) {
-//                        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
-//                    }
-//                    catch(Exception e) {
-//                        keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
-//                    }
-//                }
-//            }
-//            while (provider.shouldContinuePulling(statuses));
-
-            LOGGER.info(id + " Thread Finished");
-
-        }
-
-    }
-
-    private Map<Long, Long> userPullInfo;
-
-    protected boolean shouldContinuePulling(List<MediaFeedData> statuses) {
-        return (statuses != null) && (statuses.size() > 0);
-    }
-
-    private void sleep()
-    {
-        Thread.yield();
-        try {
-            // wait one tenth of a millisecond
-            Thread.yield();
-            Thread.sleep(1);
-            Thread.yield();
-        }
-        catch(IllegalArgumentException e) {
-            // passing in static values, this will never happen
-        }
-        catch(InterruptedException e) {
-            // noOp, there must have been an issue sleeping
-        }
-        Thread.yield();
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
-        StreamsResultSet result;
-
-        try {
-            lock.writeLock().lock();
-            result = new StreamsResultSet(providerQueue);
-            result.setCounter(new DatumStatusCounter());
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        if( providerQueue.isEmpty() && executor.isTerminated()) {
-            LOGGER.info("Finished.  Cleaning up...");
-
-            running.set(false);
-
-            LOGGER.info("Exiting");
-        }
-
-        return result;
-
-    }
-
-    protected Queue<StreamsDatum> constructQueue() {
-        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-
-    @Override
-    public void prepare(Object o) {
-
-        executor = getExecutor();
-        running.set(true);
-        try {
-            lock.writeLock().lock();
-            providerQueue = constructQueue();
-        } finally {
-            lock.writeLock().unlock();
-        }
-
-        Preconditions.checkNotNull(providerQueue);
-
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(config.getAccessToken());
-
-        //idsCount = config.getFollow().size();
-
-        client = getInstagramClient();
-    }
-
-    protected Instagram getInstagramClient()
-    {
-        // twitter4j -> jInstagram
-//        String baseUrl = "https://api.instagram.com:443/1.1/";
-//
-//        ConfigurationBuilder builder = new ConfigurationBuilder()
-//                .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-//                .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-//                .setOAuthAccessToken(config.getOauth().getAccessToken())
-//                .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-//                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
-//                .setJSONStoreEnabled(jsonStoreEnabled)
-//                .setAsyncNumThreads(3)
-//                .setRestBaseURL(baseUrl)
-//                .setIncludeMyRetweetEnabled(Boolean.TRUE)
-//                .setPrettyDebugEnabled(Boolean.TRUE);
-//
-//        return new InstagramFactory(builder.build()).getInstance();
-        return null;
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index 18a59b9..f8f8117 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,16 +5,17 @@
     "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "version": {
+        "clientId": {
             "type": "string",
-            "description": "The version"
+            "description": "Your Instagram Client Id"
         },
-        "endpoint": {
+        "clientSecret": {
             "type": "string",
-            "description": "The endpoint"
+            "description": "Your Instagram Client secret"
         },
-        "accessToken": {
-            "type": "string"
+        "callbackUrl": {
+            "type": "string",
+            "description": "Your Instagream callback url"
         }
    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
index 4b75ee4..b6a8c6b 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
@@ -6,7 +6,7 @@
     "extends": {"$ref":"InstagramConfiguration.json"},
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "info": {
+        "userIds": {
             "type": "array",
             "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream",
             "items": {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
new file mode 100644
index 0000000..39f607c
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -0,0 +1,161 @@
+package org.apache.streams.instagram.provider;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.common.Pagination;
+import org.jinstagram.entity.users.feed.MediaFeed;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.jinstagram.exceptions.InstagramException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link org.apache.streams.instagram.provider.InstagramRecentMediaCollector}
+ */
+public class InstagramRecentMediaCollectorTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
+
+    private int expectedDataCount = 0;
+    private long randomSeed = System.currentTimeMillis();
+    private Random rand = new Random(randomSeed);
+    private Map<Pagination, MediaFeed> pageMap = Maps.newHashMap();
+
+    @Test
+    public void testHandleInstagramException1() throws InstagramException {
+        InstagramException ie = mock(InstagramException.class);
+        when(ie.getRemainingLimitStatus()).thenReturn(1);
+        final String message = "Test Message";
+        when(ie.getMessage()).thenReturn(message);
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
+        try {
+            collector.handleInstagramException(ie, 1);
+            fail("Expected RuntimeException to be thrown");
+        } catch (InstagramException rte) {
+//            assertTrue(rte.getMessage().contains("Mock for InstagramException"));
+            assertEquals(message, rte.getMessage());
+        }
+    }
+
+    @Test
+    public void testHandleInstagramException2() throws InstagramException{
+        InstagramException ie = mock(InstagramException.class);
+        when(ie.getRemainingLimitStatus()).thenReturn(0);
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration());
+        long startTime = System.currentTimeMillis();
+        collector.handleInstagramException(ie, 1);
+        long endTime = System.currentTimeMillis();
+        LOGGER.debug("Slept for {} ms", startTime - endTime);
+        assertTrue(endTime - startTime >= 4000); //allow for 1 sec of error
+        startTime = System.currentTimeMillis();
+        collector.handleInstagramException(ie, 2);
+        endTime = System.currentTimeMillis();
+        LOGGER.debug("Slept for {} ms", startTime - endTime);
+        assertTrue(endTime - startTime >= 24000); //allow for 1 sec of error
+    }
+
+    @Test
+    public void testGetUserIds() {
+        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
+        List<String> userIds = Lists.newLinkedList();
+        userIds.add("1");
+        userIds.add("2");
+        userIds.add("3");
+        userIds.add("4");
+        userIds.add("abcdefg");
+        config.setUserIds(userIds);
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), config);
+
+        Set<Long> expected = Sets.newHashSet();
+        expected.add(1L);
+        expected.add(2L);
+        expected.add(3L);
+        expected.add(4L);
+
+        assertEquals(expected, collector.getUserIds());
+    }
+
+    @Test
+    public void testRun() {
+        Queue<MediaFeedData> data = Queues.newConcurrentLinkedQueue();
+        InstagramUserInformationConfiguration config = new InstagramUserInformationConfiguration();
+        List<String> userIds = Lists.newLinkedList();
+        userIds.add("1");
+        userIds.add("2");
+        userIds.add("3");
+        userIds.add("4");
+        config.setUserIds(userIds);
+        InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config);
+        collector.setInstagramClient(createMockInstagramClient());
+        collector.run();
+        LOGGER.debug("Random seed == {}", randomSeed);
+        assertEquals("Random Seed == " + randomSeed, this.expectedDataCount, data.size());
+    }
+
+    private Instagram createMockInstagramClient() {
+        final Instagram instagramClient = mock(Instagram.class);
+        try {
+            final InstagramException mockException = mock(InstagramException.class);
+            when(mockException.getRemainingLimitStatus()).thenReturn(-1);
+            when(mockException.getMessage()).thenReturn("MockInstagramException message");
+            when(instagramClient.getRecentMediaFeed(any(Long.class))).thenAnswer(new Answer<MediaFeed>() {
+                @Override
+                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    long param = (Long) invocationOnMock.getArguments()[0];
+                    if (param == 2L) {
+                        throw mockException;
+                    } else {
+                        return createRandomMockMediaFeed();
+                    }
+                }
+            });
+            when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
+                @Override
+                public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    return createRandomMockMediaFeed();
+                }
+            });
+        } catch (InstagramException ie) {
+            fail("Failed to create mock instagram client.");
+        }
+        return instagramClient;
+    }
+
+    private MediaFeed createRandomMockMediaFeed() throws InstagramException {
+        MediaFeed feed = mock(MediaFeed.class);
+        when(feed.getData()).thenReturn(createData(this.rand.nextInt(100)));
+        Pagination pagination = mock(Pagination.class);
+        if(this.rand.nextInt(2) == 0) {
+            when(pagination.hasNextPage()).thenReturn(true);
+        } else {
+            when(pagination.hasNextPage()).thenReturn(false);
+        }
+        when(feed.getPagination()).thenReturn(pagination);
+        return feed;
+    }
+
+    private List<MediaFeedData> createData(int size) {
+        List<MediaFeedData> data = Lists.newLinkedList();
+        for(int i=0; i < size; ++i) {
+            data.add(mock(MediaFeedData.class));
+        }
+        this.expectedDataCount += size;
+        return data;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
new file mode 100644
index 0000000..7d2a47b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
@@ -0,0 +1,145 @@
+package org.apache.streams.instagram.provider;
+
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+
+/**
+ *
+ */
+public class InstagramRecentMediaProviderTest {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
+
+    @Test
+    public void testStartStream() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<MediaFeedData>(), new InstagramUserInformationConfiguration()) {
+
+            private volatile boolean isFinished = false;
+
+            @Override
+            public void run() {
+                this.isFinished = true;
+                latch.countDown();
+            }
+
+            @Override
+            public boolean isCompleted() {
+                return this.isFinished;
+            }
+        };
+
+        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
+            @Override
+            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+                return collectorStub;
+            }
+        };
+
+        provider.startStream();
+
+        latch.await();
+        assertTrue(collectorStub.isCompleted());
+        StreamsResultSet result = provider.readCurrent();
+        assertNotNull(result);
+        assertEquals(0, result.size());
+        assertTrue(!provider.isRunning());
+        try {
+            provider.cleanUp();
+        } catch (Throwable throwable){
+            throwable.printStackTrace();
+            fail("Error durring clean up");
+        }
+    }
+
+    @Test
+    public void testReadCurrent() {
+        final long seed = System.nanoTime();
+        final Random rand = new Random(seed);
+        final CyclicBarrier test = new CyclicBarrier(2);
+        final CyclicBarrier produce = new CyclicBarrier(2);
+        final AtomicInteger batchCount = new AtomicInteger(0);
+        InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(new InstagramUserInformationConfiguration()) {
+            @Override
+            protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+                return new InstagramRecentMediaCollector(super.mediaFeedQueue, new InstagramUserInformationConfiguration()) {
+
+                    private volatile boolean isFinished = false;
+
+
+
+                    public int getBatchCount() {
+                        return batchCount.get();
+                    }
+
+                    @Override
+                    public boolean isCompleted() {
+                        return isFinished;
+                    }
+
+                    @Override
+                    public void run() {
+                        int randInt = rand.nextInt(5);
+                        while(randInt != 0) {
+                            int batchSize = rand.nextInt(200);
+                            for(int i=0; i < batchSize; ++i) {
+                                while(!super.dataQueue.add(mock(MediaFeedData.class))) {
+                                    Thread.yield();
+                                }
+                            }
+                            batchCount.set(batchSize);
+                            try {
+                                test.await();
+                                produce.await();
+                            } catch (InterruptedException ie ) {
+                                Thread.currentThread().interrupt();
+                            } catch (BrokenBarrierException bbe) {
+                                Thread.currentThread().interrupt();
+                            }
+                            randInt = rand.nextInt(5);
+                        }
+                        batchCount.set(0);
+                        isFinished = true;
+                        try {
+                            test.await();
+                            produce.await();
+                        } catch (InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        } catch (BrokenBarrierException bbe) {
+                            Thread.currentThread().interrupt();
+                        }
+                    }
+
+                };
+            }
+        };
+        provider.startStream();
+        while(provider.isRunning()) {
+            try {
+                test.await();
+                assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
+                produce.await();
+            } catch (InterruptedException ie) {
+                Thread.currentThread().interrupt();
+            } catch (BrokenBarrierException bbe) {
+                Thread.currentThread().interrupt();
+            }
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/77603934/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
deleted file mode 100644
index fcf5e81..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
-* Created with IntelliJ IDEA.
-* User: sblackmon
-* Date: 8/20/13
-* Time: 5:57 PM
-* To change this template use File | Settings | File Templates.
-*/
-public class InstagramActivitySerDeTest {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    private InstagramJsonActivitySerializer instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
-
-    // remove @Ignore after implementation
-    @Ignore
-    @Test
-    public void Tests()
-    {
-        InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/test.txt");
-        InputStreamReader isr = new InputStreamReader(is);
-        BufferedReader br = new BufferedReader(isr);
-
-        try {
-            while (br.ready()) {
-                String line = br.readLine();
-                if(!StringUtils.isEmpty(line))
-                {
-                    LOGGER.info("raw: {}", line);
-
-                    // convert to MediaFeedData?
-                    Activity activity = instagramJsonActivitySerializer.deserialize(line);
-
-                    String activitystring = mapper.writeValueAsString(activity);
-
-                    LOGGER.info("activity: {}", activitystring);
-
-                    assertThat(activity, is(not(nullValue())));
-
-                    assertThat(activity.getId(), is(not(nullValue())));
-                    assertThat(activity.getActor(), is(not(nullValue())));
-                    assertThat(activity.getActor().getId(), is(not(nullValue())));
-                    assertThat(activity.getVerb(), is(not(nullValue())));
-                    assertThat(activity.getProvider(), is(not(nullValue())));
-
-                }
-            }
-        } catch( Exception e ) {
-            System.out.println(e);
-            e.printStackTrace();
-            Assert.fail();
-        }
-    }
-}


[5/6] git commit: STREAMS-121 | Change volatile booleans to AtomicBooleans per pull request feedbask

Posted by mf...@apache.org.
STREAMS-121 | Change volatile booleans to AtomicBooleans per pull request feedbask


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/110dddbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/110dddbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/110dddbd

Branch: refs/heads/instagram
Commit: 110dddbd25eefbce5ec959103f6004cca5864f28
Parents: 45510b2
Author: rebanks <re...@w2odigital.com>
Authored: Mon Jul 14 14:05:37 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Mon Jul 14 14:05:37 2014 -0500

----------------------------------------------------------------------
 .../apache/streams/instagram/InstagramConfigurator.java   |  2 +-
 .../instagram/provider/InstagramRecentMediaCollector.java |  9 +++++----
 .../instagram/provider/InstagramRecentMediaProvider.java  | 10 +++++-----
 .../provider/InstagramRecentMediaProviderTest.java        |  3 ++-
 4 files changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/110dddbd/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
index cab072d..11e6d79 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * 
+ *
  */
 public class InstagramConfigurator {
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/110dddbd/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index e459a0a..4f27e49 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Executes on all of the Instagram requests to collect the media feed data.
@@ -43,14 +44,14 @@ public class InstagramRecentMediaCollector implements Runnable {
     protected Queue dataQueue; //exposed for testing
     private InstagramUserInformationConfiguration config;
     private Instagram instagramClient;
-    private volatile boolean isCompleted;
+    private AtomicBoolean isCompleted;
 
 
     public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramUserInformationConfiguration config) {
         this.dataQueue = queue;
         this.config = config;
         this.instagramClient = new Instagram(this.config.getClientId());
-        this.isCompleted = false;
+        this.isCompleted = new AtomicBoolean(false);
     }
 
     /**
@@ -151,7 +152,7 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @return true when the collector has queued all of available media feed data for the provided users.
      */
     public boolean isCompleted() {
-        return this.isCompleted;
+        return this.isCompleted.get();
     }
 
     @Override
@@ -159,6 +160,6 @@ public class InstagramRecentMediaCollector implements Runnable {
         for(Long userId : getUserIds()) {
             getUserMedia(userId);
         }
-        this.isCompleted = true;
+        this.isCompleted.set(true);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/110dddbd/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index 3d67a48..30ddda4 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -29,6 +29,7 @@ import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
@@ -39,7 +40,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
     private InstagramRecentMediaCollector dataCollector;
     protected Queue<MediaFeedData> mediaFeedQueue; //exposed for testing
     private ExecutorService executorService;
-    private volatile boolean isCompleted;
+    private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
         this(InstagramConfigurator.detectInstagramUserInformationConfiguration(StreamsConfigurator.config.getConfig("instagram")));
@@ -48,7 +49,6 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
     public InstagramRecentMediaProvider(InstagramUserInformationConfiguration config) {
         this.config = config;
         this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
-        this.isCompleted = false;
     }
 
     @Override
@@ -77,7 +77,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
                 batch.add(new StreamsDatum(data, data.getId()));
             }
         }
-        this.isCompleted = batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted();
+        this.isCompleted.set(batch.size() == 0 && this.mediaFeedQueue.isEmpty() && this.dataCollector.isCompleted());
         return new StreamsResultSet(batch);
     }
 
@@ -93,12 +93,12 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
 
     @Override
     public boolean isRunning() {
-        return !this.isCompleted;
+        return !this.isCompleted.get();
     }
 
     @Override
     public void prepare(Object configurationObject) {
-
+        this.isCompleted = new AtomicBoolean(false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/110dddbd/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
index 59e90b4..d81f85a 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
@@ -63,7 +63,7 @@ public class InstagramRecentMediaProviderTest {
                 return collectorStub;
             }
         };
-
+        provider.prepare(null);
         provider.startStream();
 
         latch.await();
@@ -141,6 +141,7 @@ public class InstagramRecentMediaProviderTest {
                 };
             }
         };
+        provider.prepare(null);
         provider.startStream();
         while(provider.isRunning()) {
             try {


[6/6] git commit: Fixed merge issue for rbnks/STREAMS-121

Posted by mf...@apache.org.
Fixed merge issue for rbnks/STREAMS-121


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/2f0a20f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/2f0a20f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/2f0a20f3

Branch: refs/heads/instagram
Commit: 2f0a20f30eb8878b74126190435819a02d714452
Parents: 110dddb
Author: mfranklin <mf...@apache.org>
Authored: Tue Jul 15 12:46:15 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Tue Jul 15 12:46:15 2014 -0400

----------------------------------------------------------------------
 streams-contrib/pom.xml | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/2f0a20f3/streams-contrib/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/pom.xml b/streams-contrib/pom.xml
index e225af1..699274e 100644
--- a/streams-contrib/pom.xml
+++ b/streams-contrib/pom.xml
@@ -59,7 +59,6 @@
         <module>streams-provider-sysomos</module>
         <module>streams-provider-rss</module>
         <module>streams-processor-regex</module>
-        <module>streams-provider-instagram</module>
     </modules>
 
     <dependencyManagement>


[3/6] git commit: STREAMS-121 | Merged apache instagram branch

Posted by mf...@apache.org.
STREAMS-121 | Merged apache instagram branch


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1e233007
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1e233007
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1e233007

Branch: refs/heads/instagram
Commit: 1e233007d0adcff9459bda9bad8dea00e2f27be4
Parents: 95f6d5d 14f7050
Author: rebanks <re...@w2odigital.com>
Authored: Fri Jul 11 16:41:12 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Fri Jul 11 16:41:12 2014 -0500

----------------------------------------------------------------------
 streams-contrib/pom.xml                         |   1 +
 .../processor/InstagramTypeConverter.java       | 116 ++----------
 .../InstagramJsonActivitySerializer.java        |  22 ++-
 .../serializer/util/InstagramActivityUtil.java  | 187 ++++++++++++++++---
 .../serializer/util/InstagramDeserializer.java  |  33 ++++
 .../test/InstagramActivitySerDeTest.java        |  88 +++++++++
 .../src/test/resources/testMediaFeedObjects.txt |   2 +
 7 files changed, 323 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1e233007/streams-contrib/pom.xml
----------------------------------------------------------------------


[2/6] git commit: STREAMS 121 | Added javadoc comments

Posted by mf...@apache.org.
STREAMS 121 | Added javadoc comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/95f6d5d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/95f6d5d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/95f6d5d9

Branch: refs/heads/instagram
Commit: 95f6d5d9bc9069258717a9880e491ce55ae343e2
Parents: 7760393
Author: rebanks <re...@w2odigital.com>
Authored: Fri Jul 11 16:29:55 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Fri Jul 11 16:29:55 2014 -0500

----------------------------------------------------------------------
 .../provider/InstagramRecentMediaCollector.java | 32 +++++++++++++++++++-
 .../provider/InstagramRecentMediaProvider.java  |  6 +++-
 2 files changed, 36 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95f6d5d9/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 7eb3fcd..928ff9e 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -13,6 +13,11 @@ import java.util.Queue;
 import java.util.Set;
 
 /**
+ * Executes on all of the Instagram requests to collect the media feed data.
+ *
+ * If errors/exceptions occur when trying to gather data for a particular user, that user is skipped and the collector
+ * move on to the next user.  If a rate limit exception occurs it employs an exponential back off strategy for up to
+ * 5 attempts.
  *
  */
 public class InstagramRecentMediaCollector implements Runnable {
@@ -34,10 +39,19 @@ public class InstagramRecentMediaCollector implements Runnable {
         this.isCompleted = false;
     }
 
+    /**
+     * Set instagram client
+     * @param instagramClient
+     */
     protected void setInstagramClient(Instagram instagramClient) {
         this.instagramClient = instagramClient;
     }
 
+    /**
+     * Gets the user ids from the {@link org.apache.streams.instagram.InstagramUserInformationConfiguration} and
+     * converts them to {@link java.lang.Long}
+     * @return
+     */
     protected Set<Long> getUserIds() {
         Set<Long> userIds = Sets.newHashSet();
         for(String id : config.getUserIds()) {
@@ -50,6 +64,14 @@ public class InstagramRecentMediaCollector implements Runnable {
         return userIds;
     }
 
+    /**
+     * Determins the course of action to take when Instagram returns an exception to a request.  If it is a rate limit
+     * exception, it implements an exponentional back off strategy.  If it is anyother exception, it is logged and
+     * rethrown.
+     * @param instaExec exception to handle
+     * @param attempt number of attempts that have occured to pull this users information
+     * @throws InstagramException
+     */
     protected void handleInstagramException(InstagramException instaExec, int attempt) throws InstagramException {
         LOGGER.debug("RemainingApiLimitStatus: {}", instaExec.getRemainingLimitStatus());
         if(instaExec.getRemainingLimitStatus() == 0) { //rate limit exception
@@ -66,6 +88,10 @@ public class InstagramRecentMediaCollector implements Runnable {
         }
     }
 
+    /**
+     * Gets the MediaFeedData for this particular user and adds it to the share queued.
+     * @param userId
+     */
     private void getUserMedia(Long userId) {
         MediaFeed feed = null;
         int attempts = 0;
@@ -97,7 +123,7 @@ public class InstagramRecentMediaCollector implements Runnable {
             LOGGER.error("User id, {}, returned a NULL media feed from instagram.", userId);
         } else {
             for(MediaFeedData data : userFeed.getData()) {
-                synchronized (this.dataQueue) {
+                synchronized (this.dataQueue) { //unnecessary
                     while(!this.dataQueue.offer(data)) {
                         Thread.yield();
                     }
@@ -106,6 +132,10 @@ public class InstagramRecentMediaCollector implements Runnable {
         }
     }
 
+    /**
+     *
+     * @return true when the collector has queued all of available media feed data for the provided users.
+     */
     public boolean isCompleted() {
         return this.isCompleted;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/95f6d5d9/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index 3354e54..e5fa464 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -17,7 +17,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Created by rebanks on 7/9/14.
+ * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
  */
 public class InstagramRecentMediaProvider implements StreamsProvider {
 
@@ -44,6 +44,10 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
         this.executorService.submit(this.dataCollector);
     }
 
+    /**
+     * EXPOSED FOR TESTING
+     * @return
+     */
     protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
         return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
     }


[4/6] git commit: Added license header and remvoed comments

Posted by mf...@apache.org.
Added license header and remvoed comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/45510b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/45510b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/45510b29

Branch: refs/heads/instagram
Commit: 45510b292b8f1dc5de5611c87abf70b856be4125
Parents: 1e23300
Author: rebanks <re...@w2odigital.com>
Authored: Mon Jul 14 10:22:47 2014 -0500
Committer: rebanks <re...@w2odigital.com>
Committed: Mon Jul 14 10:22:47 2014 -0500

----------------------------------------------------------------------
 .../streams/instagram/InstagramConfigurator.java      | 12 +-----------
 .../provider/InstagramRecentMediaCollector.java       | 14 ++++++++++++++
 .../provider/InstagramRecentMediaProvider.java        | 14 ++++++++++++++
 .../provider/InstagramRecentMediaCollectorTest.java   | 14 ++++++++++++++
 .../provider/InstagramRecentMediaProviderTest.java    | 14 ++++++++++++++
 5 files changed, 57 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45510b29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
index 4d01605..cab072d 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -28,7 +28,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 
 /**
- * Created by sblackmon on 12/10/13.
+ * 
  */
 public class InstagramConfigurator {
 
@@ -38,9 +38,6 @@ public class InstagramConfigurator {
 
     public static InstagramConfiguration detectInstagramConfiguration(Config config) {
 
-//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-//        Validator validator = factory.getValidator();
-
         InstagramConfiguration instagramConfiguration = null;
         try {
             instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramConfiguration.class);
@@ -49,16 +46,11 @@ public class InstagramConfigurator {
         }
         Preconditions.checkNotNull(instagramConfiguration);
 
-//        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
-
         return instagramConfiguration;
     }
 
     public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) {
 
-//        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
-//        Validator validator = factory.getValidator();
-
         InstagramUserInformationConfiguration instagramConfiguration = null;
         try {
             instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramUserInformationConfiguration.class);
@@ -67,8 +59,6 @@ public class InstagramConfigurator {
         }
         Preconditions.checkNotNull(instagramConfiguration);
 
-//        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
-
         return instagramConfiguration;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45510b29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 928ff9e..e459a0a 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.instagram.provider;
 
 import com.google.common.collect.Sets;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45510b29/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index e5fa464..3d67a48 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.instagram.provider;
 
 import com.google.common.collect.Queues;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45510b29/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
index 39f607c..0225b40 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollectorTest.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.instagram.provider;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/45510b29/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
index 7d2a47b..59e90b4 100644
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/InstagramRecentMediaProviderTest.java
@@ -1,3 +1,17 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
 package org.apache.streams.instagram.provider;
 
 import org.apache.streams.core.StreamsResultSet;