You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/06/27 01:59:43 UTC

[2/2] git commit: bootstrap of module

bootstrap of module


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/537b97aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/537b97aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/537b97aa

Branch: refs/heads/instagram
Commit: 537b97aa0adb1bf396b51cc12214722bf4f95c6c
Parents: 34c95a6
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Jun 26 18:58:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Jun 26 18:58:14 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-instagram/README.md        |  17 +
 .../streams-provider-instagram/pom.xml          | 139 ++++
 .../instagram/InstagramConfigurator.java        |  78 +++
 .../processor/InstagramTypeConverter.java       | 190 +++++
 .../provider/InstagramTimelineProvider.java     | 409 +++++++++++
 .../InstagramJsonActivitySerializer.java        |  60 ++
 .../serializer/util/InstagramActivityUtil.java  | 142 ++++
 .../com/instagram/InstagramConfiguration.json   |  20 +
 .../InstagramUserInformationConfiguration.json  |  17 +
 .../src/main/resources/reference.conf           |   5 +
 .../test/InstagramActivitySerDeTest.java        |  92 +++
 .../src/test/resources/testtweets.txt           | 695 +++++++++++++++++++
 12 files changed, 1864 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/README.md b/streams-contrib/streams-provider-instagram/README.md
new file mode 100644
index 0000000..3bca23b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/README.md
@@ -0,0 +1,17 @@
+streams-provider-instagram
+
+Purpose                  
+
+  Module connects to instagram API, collects events, converts to activity, and passes each activity downstream.
+
+Example configuration
+
+    "instagram": {
+        "version": "v1",
+        "endpoint": "media/recent",
+        "accessToken": "",
+        "info": [
+            "3",
+            "kevin"        
+        ]
+    }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml
new file mode 100644
index 0000000..e356a7a
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.streams</groupId>
+        <artifactId>streams-contrib</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>streams-provider-instagram</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.datatype</groupId>
+            <artifactId>jackson-datatype-joda</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-pojo</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-config</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.streams</groupId>
+            <artifactId>streams-util</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path-assert</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.sachinhandiekar</groupId>
+            <artifactId>jInstagram</artifactId>
+            <version>1.0.7</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.8</version>
+                <executions>
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>target/generated-sources/jsonschema2pojo</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.jsonschema2pojo</groupId>
+                <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+                <configuration>
+                    <addCompileSourceRoot>true</addCompileSourceRoot>
+                    <generateBuilders>true</generateBuilders>
+                    <sourcePaths>
+                        <sourcePath>src/main/jsonschema/com/instagram/InstagramConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json</sourcePath>
+                    </sourcePaths>
+                    <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+                    <targetPackage>org.apache.streams.instagram.pojo</targetPackage>
+                    <useLongIntegers>true</useLongIntegers>
+                    <useJodaDates>true</useJodaDates>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
new file mode 100644
index 0000000..f771856
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class InstagramConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramConfigurator.class);
+    private final static ObjectMapper mapper = new ObjectMapper();
+
+
+    public static InstagramConfiguration detectInstagramConfiguration(Config config) {
+
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        Validator validator = factory.getValidator();
+
+        InstagramConfiguration instagramConfiguration = null;
+        try {
+            instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Preconditions.checkNotNull(instagramConfiguration);
+
+        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+
+        return instagramConfiguration;
+    }
+
+    public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) {
+
+        ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+        Validator validator = factory.getValidator();
+
+        InstagramUserInformationConfiguration instagramConfiguration = null;
+        try {
+            instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramUserInformationConfiguration.class);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        Preconditions.checkNotNull(instagramConfiguration);
+
+        Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+
+        return instagramConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
new file mode 100644
index 0000000..14260e3
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class InstagramTypeConverter implements StreamsProcessor {
+
+    public final static String STREAMS_ID = "InstagramTypeConverter";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class);
+
+    private ObjectMapper mapper;
+
+    private Queue<MediaFeedData> inQueue;
+    private Queue<StreamsDatum> outQueue;
+
+    private Class inClass;
+    private Class outClass;
+
+    private InstagramJsonActivitySerializer instagramJsonActivitySerializer;
+
+    private int count = 0;
+
+    public final static String TERMINATE = new String("TERMINATE");
+
+    public InstagramTypeConverter(Class inClass, Class outClass) {
+        this.inClass = inClass;
+        this.outClass = outClass;
+    }
+
+    public Queue<StreamsDatum> getProcessorOutputQueue() {
+        return outQueue;
+    }
+
+    public void setProcessorInputQueue(Queue<MediaFeedData> inputQueue) {
+        inQueue = inputQueue;
+    }
+
+    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+        Object result = null;
+
+        if( outClass.equals( Activity.class )) {
+            LOGGER.debug("ACTIVITY");
+            result = instagramJsonActivitySerializer.deserialize(
+                    mapper.writeValueAsString(event));
+        } else if( outClass.equals( ObjectNode.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.convertValue(event, ObjectNode.class);
+        } else if( outClass.equals( String.class )) {
+            LOGGER.debug("OBJECTNODE");
+            result = mapper.writeValueAsString(event);
+        }
+
+
+    // no supported conversion were applied
+        if( result != null ) {
+            count ++;
+            return result;
+        }
+
+        LOGGER.debug("CONVERT FAILED");
+
+        return null;
+
+    }
+
+    public boolean validate(Object document, Class klass) {
+
+        // TODO
+        return true;
+    }
+
+    public boolean isValidJSON(final String json) {
+        boolean valid = false;
+        try {
+            final JsonParser parser = new ObjectMapper().getJsonFactory()
+                    .createJsonParser(json);
+            while (parser.nextToken() != null) {
+            }
+            valid = true;
+        } catch (JsonParseException jpe) {
+            LOGGER.warn("validate: {}", jpe);
+        } catch (IOException ioe) {
+            LOGGER.warn("validate: {}", ioe);
+        }
+
+        return valid;
+    }
+
+    @Override
+    public List<StreamsDatum> process(StreamsDatum entry) {
+
+        StreamsDatum result = null;
+
+        try {
+
+            Object item = entry.getDocument();
+            ObjectNode node;
+
+            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+            if( item instanceof String ) {
+
+                // if the target is string, just pass-through
+                if( String.class.equals(outClass)) {
+                    result = entry;
+                }
+                else {
+                    // first check for valid json
+                    node = (ObjectNode)mapper.readTree((String)item);
+
+                    Object out = convert(node, String.class, outClass);
+
+                    if( out != null && validate(out, outClass))
+                        result = new StreamsDatum(out);
+                }
+
+            } else if( item instanceof ObjectNode ) {
+
+                // first check for valid json
+                node = (ObjectNode)mapper.valueToTree(item);
+
+                Object out = convert(node, ObjectNode.class, outClass);
+
+                if( out != null && validate(out, outClass))
+                    result = new StreamsDatum(out);
+
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        if( result != null )
+            return Lists.newArrayList(result);
+        else
+            return Lists.newArrayList();
+    }
+
+    @Override
+    public void prepare(Object o) {
+        mapper = new StreamsJacksonMapper();
+        instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
new file mode 100644
index 0000000..d3e7179
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class InstagramTimelineProvider implements StreamsProvider, Serializable {
+
+    public final static String STREAMS_ID = "InstagramTimelineProvider";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
+    public static final int MAX_NUMBER_WAITING = 10000;
+
+    private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+    private InstagramUserInformationConfiguration config;
+
+    private Class klass;
+    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    public InstagramUserInformationConfiguration getConfig() {
+        return config;
+    }
+
+    public void setConfig(InstagramUserInformationConfiguration config) {
+        this.config = config;
+    }
+
+    protected Iterator<Long[]> idsBatches;
+    protected Iterator<String[]> screenNameBatches;
+
+    protected volatile Queue<StreamsDatum> providerQueue;
+
+    protected int idsCount;
+    protected Instagram client;
+
+
+    protected ExecutorService executor;
+
+    protected DateTime start;
+    protected DateTime end;
+
+    protected final AtomicBoolean running = new AtomicBoolean();
+
+    private static ExecutorService getExecutor() {
+        return Executors.newSingleThreadExecutor();
+    }
+
+    public InstagramTimelineProvider() {
+        Config config = StreamsConfigurator.config.getConfig("instagram");
+        this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
+    }
+
+    public InstagramTimelineProvider(InstagramUserInformationConfiguration config) {
+        this.config = config;
+    }
+
+    public InstagramTimelineProvider(Class klass) {
+        Config config = StreamsConfigurator.config.getConfig("instagram");
+        this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
+        this.klass = klass;
+    }
+
+    public InstagramTimelineProvider(InstagramUserInformationConfiguration config, Class klass) {
+        this.config = config;
+        this.klass = klass;
+    }
+
+
+    public Queue<StreamsDatum> getProviderQueue() {
+        return this.providerQueue;
+    }
+
+    @Override
+    public void startStream() {
+        LOGGER.debug("{} startStream", STREAMS_ID);
+
+        Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+        LOGGER.info("readCurrent");
+
+        while(idsBatches.hasNext())
+            loadBatch(idsBatches.next());
+
+        while(screenNameBatches.hasNext())
+            loadBatch(screenNameBatches.next());
+
+        executor.shutdown();
+    }
+
+    private void loadBatch(Long[] ids) {
+
+        // twitter4j implementation below - replace with jInstagram
+
+//        Twitter client = getTwitterClient();
+//        int keepTrying = 0;
+//
+//        // keep trying to load, give it 5 attempts.
+//        //while (keepTrying < 10)
+//        while (keepTrying < 1)
+//        {
+//            try
+//            {
+//                long[] toQuery = new long[ids.length];
+//                for(int i = 0; i < ids.length; i++)
+//                    toQuery[i] = ids[i];
+//
+//                for (User tStat : client.lookupUsers(toQuery)) {
+//
+//                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+//                    executor.submit(providerTask);
+//
+//                }
+//                keepTrying = 10;
+//            }
+//            catch(TwitterException twitterException) {
+//                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+//            }
+//            catch(Exception e) {
+//                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+//            }
+//        }
+    }
+
+    private void loadBatch(String[] ids) {
+
+        // twitter4j implementation below - replace with jInstagram
+//
+//        Twitter client = getTwitterClient();
+//        int keepTrying = 0;
+//
+//        // keep trying to load, give it 5 attempts.
+//        //while (keepTrying < 10)
+//        while (keepTrying < 1)
+//        {
+//            try
+//            {
+//                for (User tStat : client.lookupUsers(ids)) {
+//
+//                    TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+//                    executor.submit(providerTask);
+//
+//                }
+//                keepTrying = 10;
+//            }
+//            catch(TwitterException twitterException) {
+//                keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+//            }
+//            catch(Exception e) {
+//                keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+//            }
+//        }
+    }
+
+    public class InstagramTimelineProviderTask implements Runnable {
+
+        // twitter4j implementation below - replace with jInstagram
+
+        private final Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
+
+        private InstagramTimelineProvider provider;
+        private Instagram client;
+        private Long id;
+
+        public InstagramTimelineProviderTask(InstagramTimelineProvider provider, Instagram client, Long id) {
+            this.provider = provider;
+            this.client = client;
+            this.id = id;
+        }
+
+        @Override
+        public void run() {
+
+            // twitter4j implementation below - replace with jInstagram
+
+//            Paging paging = new Paging(1, 200);
+//            List<Status> statuses = null;
+//            boolean KeepGoing = true;
+//            boolean hadFailure = false;
+//
+//            do
+//            {
+//                int keepTrying = 0;
+//
+//                // keep trying to load, give it 5 attempts.
+//                //This value was chosen because it seemed like a reasonable number of times
+//                //to retry capturing a timeline given the sorts of errors that could potentially
+//                //occur (network timeout/interruption, faulty client, etc.)
+//                while (keepTrying < 5)
+//                {
+//
+//                    try
+//                    {
+//                        statuses = client.getUserTimeline(id, paging);
+//
+//                        for (Status tStat : statuses)
+//                        {
+//                            String json = TwitterObjectFactory.getRawJSON(tStat);
+//
+//                            try {
+//                                provider.lock.readLock().lock();
+//                                ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
+//                            } finally {
+//                                provider.lock.readLock().unlock();
+//                            }
+//                        }
+//
+//                        paging.setPage(paging.getPage() + 1);
+//
+//                        keepTrying = 10;
+//                    }
+//                    catch(TwitterException twitterException) {
+//                        keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+//                    }
+//                    catch(Exception e) {
+//                        keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+//                    }
+//                }
+//            }
+//            while (provider.shouldContinuePulling(statuses));
+
+            LOGGER.info(id + " Thread Finished");
+
+        }
+
+    }
+
+    private Map<Long, Long> userPullInfo;
+
+    protected boolean shouldContinuePulling(List<MediaFeedData> statuses) {
+        return (statuses != null) && (statuses.size() > 0);
+    }
+
+    private void sleep()
+    {
+        Thread.yield();
+        try {
+            // wait one tenth of a millisecond
+            Thread.yield();
+            Thread.sleep(1);
+            Thread.yield();
+        }
+        catch(IllegalArgumentException e) {
+            // passing in static values, this will never happen
+        }
+        catch(InterruptedException e) {
+            // noOp, there must have been an issue sleeping
+        }
+        Thread.yield();
+    }
+
+    public StreamsResultSet readCurrent() {
+
+        LOGGER.info("Providing {} docs", providerQueue.size());
+
+        StreamsResultSet result;
+
+        try {
+            lock.writeLock().lock();
+            result = new StreamsResultSet(providerQueue);
+            result.setCounter(new DatumStatusCounter());
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if( providerQueue.isEmpty() && executor.isTerminated()) {
+            LOGGER.info("Finished.  Cleaning up...");
+
+            running.set(false);
+
+            LOGGER.info("Exiting");
+        }
+
+        return result;
+
+    }
+
+    protected Queue<StreamsDatum> constructQueue() {
+        return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+    }
+
+    public StreamsResultSet readNew(BigInteger sequence) {
+        LOGGER.debug("{} readNew", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        LOGGER.debug("{} readRange", STREAMS_ID);
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running.get();
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+
+
+    @Override
+    public void prepare(Object o) {
+
+        executor = getExecutor();
+        running.set(true);
+        try {
+            lock.writeLock().lock();
+            providerQueue = constructQueue();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        Preconditions.checkNotNull(providerQueue);
+
+        Preconditions.checkNotNull(this.klass);
+        Preconditions.checkNotNull(config.getAccessToken());
+
+        //idsCount = config.getFollow().size();
+
+        client = getInstagramClient();
+    }
+
+    protected Instagram getInstagramClient()
+    {
+        // twitter4j -> jInstagram
+//        String baseUrl = "https://api.instagram.com:443/1.1/";
+//
+//        ConfigurationBuilder builder = new ConfigurationBuilder()
+//                .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+//                .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+//                .setOAuthAccessToken(config.getOauth().getAccessToken())
+//                .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+//                .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+//                .setJSONStoreEnabled(jsonStoreEnabled)
+//                .setAsyncNumThreads(3)
+//                .setRestBaseURL(baseUrl)
+//                .setIncludeMyRetweetEnabled(Boolean.TRUE)
+//                .setPrettyDebugEnabled(Boolean.TRUE);
+//
+//        return new InstagramFactory(builder.build()).getInstance();
+        return null;
+    }
+
+    @Override
+    public void cleanUp() {
+        shutdownAndAwaitTermination(executor);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
new file mode 100644
index 0000000..8d92641
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class InstagramJsonActivitySerializer implements ActivitySerializer<String>, Serializable
+{
+
+    public InstagramJsonActivitySerializer() {
+
+    }
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public String serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+        Activity activity = null;
+
+        // implement
+
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
new file mode 100644
index 0000000..e71c43e
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer.util;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ * Provides utilities for working with Activity objects within the context of Instagram
+ */
+public class InstagramActivityUtil {
+
+    /**
+     * Updates the given Activity object with the values from the item
+     * @param item the object to use as the source
+     * @param activity the target of the updates.  Will receive all values from the tweet.
+     * @throws ActivitySerializerException
+     */
+    public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException {
+
+    }
+
+    /**
+     * Builds the actor
+     * @param item the item
+     * @return a valid Actor
+     */
+    public static  Actor buildActor(MediaFeedData item) {
+        Actor actor = new Actor();
+        return actor;
+    }
+
+    /**
+     * Builds the ActivityObject
+     * @param item the item
+     * @return a valid Activity Object
+     */
+    public static ActivityObject buildActivityObject(MediaFeedData item) {
+        ActivityObject actObj = new ActivityObject();
+        return actObj;
+    }
+
+
+    /**
+     * Updates the content, and associated fields, with those from the given tweet
+     * @param activity the target of the updates.  Will receive all values from the tweet.
+     * @param item the object to use as the source
+     * @param verb the verb for the given activity's type
+     */
+    public static void updateActivityContent(Activity activity, MediaFeedData item, String verb) {
+
+    }
+
+    /**
+     * Gets the links from the Instagram event
+     * @param item the object to use as the source
+     * @return a list of links corresponding to the expanded URL
+     */
+    public static List<String> getLinks(MediaFeedData item) {
+        List<String> links = Lists.newArrayList();
+        return links;
+    }
+
+    /**
+     * Adds the location extension and populates with teh instagram data
+     * @param activity the Activity object to update
+     * @param item the object to use as the source
+     */
+    public static void addLocationExtension(Activity activity, MediaFeedData item) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+        Map<String, Object> location = new HashMap<String, Object>();
+
+    }
+
+    /**
+     * Gets the common instagram {@link org.apache.streams.pojo.json.Provider} object
+     * @return a provider object representing Instagram
+     */
+    public static Provider getProvider() {
+        Provider provider = new Provider();
+        provider.setId("id:providers:instagram");
+        provider.setDisplayName("Instagram");
+        return provider;
+    }
+    /**
+     * Adds the given Instagram event to the activity as an extension
+     * @param activity the Activity object to update
+     * @param event the Instagram event to add as the extension
+     */
+    public static void addInstagramExtension(Activity activity, ObjectNode event) {
+        Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+        extensions.put("instagram", event);
+    }
+    /**
+     * Formats the ID to conform with the Apache Streams activity ID convention
+     * @param idparts the parts of the ID to join
+     * @return a valid Activity ID in format "id:instagram:part1:part2:...partN"
+     */
+    public static String formatId(String... idparts) {
+        return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
+    }
+
+    /**
+     * Takes various parameters from the instagram object that are currently not part of teh
+     * activity schema and stores them in a generic extensions attribute
+     * @param activity
+     * @param item
+     */
+    public static void addInstagramExtensions(Activity activity, MediaFeedData item) {
+        Map<String, Object> extensions = ensureExtensions(activity);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
new file mode 100644
index 0000000..18a59b9
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -0,0 +1,20 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "version": {
+            "type": "string",
+            "description": "The version"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "accessToken": {
+            "type": "string"
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
new file mode 100644
index 0000000..4b75ee4
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
@@ -0,0 +1,17 @@
+{
+    "type": "object",
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "id": "#",
+    "javaType" : "org.apache.streams.instagram.InstagramUserInformationConfiguration",
+    "extends": {"$ref":"InstagramConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
new file mode 100644
index 0000000..9a01bf6
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
@@ -0,0 +1,5 @@
+instagram {
+    version = "v1"
+    endpoint = "sample"
+    accessToken = ""
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
new file mode 100644
index 0000000..fcf5e81
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class InstagramActivitySerDeTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    private InstagramJsonActivitySerializer instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+
+    // remove @Ignore after implementation
+    @Ignore
+    @Test
+    public void Tests()
+    {
+        InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/test.txt");
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+
+        try {
+            while (br.ready()) {
+                String line = br.readLine();
+                if(!StringUtils.isEmpty(line))
+                {
+                    LOGGER.info("raw: {}", line);
+
+                    // convert to MediaFeedData?
+                    Activity activity = instagramJsonActivitySerializer.deserialize(line);
+
+                    String activitystring = mapper.writeValueAsString(activity);
+
+                    LOGGER.info("activity: {}", activitystring);
+
+                    assertThat(activity, is(not(nullValue())));
+
+                    assertThat(activity.getId(), is(not(nullValue())));
+                    assertThat(activity.getActor(), is(not(nullValue())));
+                    assertThat(activity.getActor().getId(), is(not(nullValue())));
+                    assertThat(activity.getVerb(), is(not(nullValue())));
+                    assertThat(activity.getProvider(), is(not(nullValue())));
+
+                }
+            }
+        } catch( Exception e ) {
+            System.out.println(e);
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+}