You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/07/21 17:44:52 UTC
[06/47] git commit: bootstrap of module
bootstrap of module
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/537b97aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/537b97aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/537b97aa
Branch: refs/heads/STREAMS-46
Commit: 537b97aa0adb1bf396b51cc12214722bf4f95c6c
Parents: 34c95a6
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Thu Jun 26 18:58:14 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Thu Jun 26 18:58:14 2014 -0500
----------------------------------------------------------------------
.../streams-provider-instagram/README.md | 17 +
.../streams-provider-instagram/pom.xml | 139 ++++
.../instagram/InstagramConfigurator.java | 78 +++
.../processor/InstagramTypeConverter.java | 190 +++++
.../provider/InstagramTimelineProvider.java | 409 +++++++++++
.../InstagramJsonActivitySerializer.java | 60 ++
.../serializer/util/InstagramActivityUtil.java | 142 ++++
.../com/instagram/InstagramConfiguration.json | 20 +
.../InstagramUserInformationConfiguration.json | 17 +
.../src/main/resources/reference.conf | 5 +
.../test/InstagramActivitySerDeTest.java | 92 +++
.../src/test/resources/testtweets.txt | 695 +++++++++++++++++++
12 files changed, 1864 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/README.md
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/README.md b/streams-contrib/streams-provider-instagram/README.md
new file mode 100644
index 0000000..3bca23b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/README.md
@@ -0,0 +1,17 @@
+streams-provider-instagram
+
+Purpose
+
+ Module connects to instagram API, collects events, converts to activity, and passes each activity downstream.
+
+Example configuration
+
+ "instagram": {
+ "version": "v1",
+ "endpoint": "media/recent",
+ "accessToken": "",
+ "info": [
+ "3",
+ "kevin"
+ ]
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml
new file mode 100644
index 0000000..e356a7a
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/pom.xml
@@ -0,0 +1,139 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-contrib</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>streams-provider-instagram</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.datatype</groupId>
+ <artifactId>jackson-datatype-joda</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-pojo</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.streams</groupId>
+ <artifactId>streams-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.jayway.jsonpath</groupId>
+ <artifactId>json-path-assert</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sachinhandiekar</groupId>
+ <artifactId>jInstagram</artifactId>
+ <version>1.0.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-source</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>target/generated-sources/jsonschema2pojo</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.jsonschema2pojo</groupId>
+ <artifactId>jsonschema2pojo-maven-plugin</artifactId>
+ <configuration>
+ <addCompileSourceRoot>true</addCompileSourceRoot>
+ <generateBuilders>true</generateBuilders>
+ <sourcePaths>
+ <sourcePath>src/main/jsonschema/com/instagram/InstagramConfiguration.json</sourcePath>
+ <sourcePath>src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json</sourcePath>
+ </sourcePaths>
+ <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
+ <targetPackage>org.apache.streams.instagram.pojo</targetPackage>
+ <useLongIntegers>true</useLongIntegers>
+ <useJodaDates>true</useJodaDates>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
new file mode 100644
index 0000000..f771856
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/InstagramConfigurator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.Validation;
+import javax.validation.Validator;
+import javax.validation.ValidatorFactory;
+import java.io.IOException;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class InstagramConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramConfigurator.class);
+ private final static ObjectMapper mapper = new ObjectMapper();
+
+
+ public static InstagramConfiguration detectInstagramConfiguration(Config config) {
+
+ ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+ Validator validator = factory.getValidator();
+
+ InstagramConfiguration instagramConfiguration = null;
+ try {
+ instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramConfiguration.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Preconditions.checkNotNull(instagramConfiguration);
+
+ Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+
+ return instagramConfiguration;
+ }
+
+ public static InstagramUserInformationConfiguration detectInstagramUserInformationConfiguration(Config config) {
+
+ ValidatorFactory factory = Validation.buildDefaultValidatorFactory();
+ Validator validator = factory.getValidator();
+
+ InstagramUserInformationConfiguration instagramConfiguration = null;
+ try {
+ instagramConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), InstagramUserInformationConfiguration.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Preconditions.checkNotNull(instagramConfiguration);
+
+ Preconditions.checkState(validator.validate(instagramConfiguration).size() == 0);
+
+ return instagramConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
new file mode 100644
index 0000000..14260e3
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.processor;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class InstagramTypeConverter implements StreamsProcessor {
+
+ public final static String STREAMS_ID = "InstagramTypeConverter";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class);
+
+ private ObjectMapper mapper;
+
+ private Queue<MediaFeedData> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private InstagramJsonActivitySerializer instagramJsonActivitySerializer;
+
+ private int count = 0;
+
+ public final static String TERMINATE = new String("TERMINATE");
+
+ public InstagramTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<MediaFeedData> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+ Object result = null;
+
+ if( outClass.equals( Activity.class )) {
+ LOGGER.debug("ACTIVITY");
+ result = instagramJsonActivitySerializer.deserialize(
+ mapper.writeValueAsString(event));
+ } else if( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
+ } else if( outClass.equals( String.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.writeValueAsString(event);
+ }
+
+
+ // no supported conversion were applied
+ if( result != null ) {
+ count ++;
+ return result;
+ }
+
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+
+ }
+
+ public boolean validate(Object document, Class klass) {
+
+ // TODO
+ return true;
+ }
+
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
+ }
+
+ return valid;
+ }
+
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
+
+ StreamsDatum result = null;
+
+ try {
+
+ Object item = entry.getDocument();
+ ObjectNode node;
+
+ LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+
+ if( item instanceof String ) {
+
+ // if the target is string, just pass-through
+ if( String.class.equals(outClass)) {
+ result = entry;
+ }
+ else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
+
+ Object out = convert(node, String.class, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+ }
+
+ } else if( item instanceof ObjectNode ) {
+
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
+
+ Object out = convert(node, ObjectNode.class, outClass);
+
+ if( out != null && validate(out, outClass))
+ result = new StreamsDatum(out);
+
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if( result != null )
+ return Lists.newArrayList(result);
+ else
+ return Lists.newArrayList();
+ }
+
+ @Override
+ public void prepare(Object o) {
+ mapper = new StreamsJacksonMapper();
+ instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
new file mode 100644
index 0000000..d3e7179
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramTimelineProvider.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.provider;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.instagram.InstagramConfigurator;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class InstagramTimelineProvider implements StreamsProvider, Serializable {
+
+ public final static String STREAMS_ID = "InstagramTimelineProvider";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
+ public static final int MAX_NUMBER_WAITING = 10000;
+
+ private static StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+ private InstagramUserInformationConfiguration config;
+
+ private Class klass;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ public InstagramUserInformationConfiguration getConfig() {
+ return config;
+ }
+
+ public void setConfig(InstagramUserInformationConfiguration config) {
+ this.config = config;
+ }
+
+ protected Iterator<Long[]> idsBatches;
+ protected Iterator<String[]> screenNameBatches;
+
+ protected volatile Queue<StreamsDatum> providerQueue;
+
+ protected int idsCount;
+ protected Instagram client;
+
+
+ protected ExecutorService executor;
+
+ protected DateTime start;
+ protected DateTime end;
+
+ protected final AtomicBoolean running = new AtomicBoolean();
+
+ private static ExecutorService getExecutor() {
+ return Executors.newSingleThreadExecutor();
+ }
+
+ public InstagramTimelineProvider() {
+ Config config = StreamsConfigurator.config.getConfig("instagram");
+ this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
+ }
+
+ public InstagramTimelineProvider(InstagramUserInformationConfiguration config) {
+ this.config = config;
+ }
+
+ public InstagramTimelineProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("instagram");
+ this.config = InstagramConfigurator.detectInstagramUserInformationConfiguration(config);
+ this.klass = klass;
+ }
+
+ public InstagramTimelineProvider(InstagramUserInformationConfiguration config, Class klass) {
+ this.config = config;
+ this.klass = klass;
+ }
+
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public void startStream() {
+ LOGGER.debug("{} startStream", STREAMS_ID);
+
+ Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
+
+ LOGGER.info("readCurrent");
+
+ while(idsBatches.hasNext())
+ loadBatch(idsBatches.next());
+
+ while(screenNameBatches.hasNext())
+ loadBatch(screenNameBatches.next());
+
+ executor.shutdown();
+ }
+
+ private void loadBatch(Long[] ids) {
+
+ // twitter4j implementation below - replace with jInstagram
+
+// Twitter client = getTwitterClient();
+// int keepTrying = 0;
+//
+// // keep trying to load, give it 5 attempts.
+// //while (keepTrying < 10)
+// while (keepTrying < 1)
+// {
+// try
+// {
+// long[] toQuery = new long[ids.length];
+// for(int i = 0; i < ids.length; i++)
+// toQuery[i] = ids[i];
+//
+// for (User tStat : client.lookupUsers(toQuery)) {
+//
+// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+// executor.submit(providerTask);
+//
+// }
+// keepTrying = 10;
+// }
+// catch(TwitterException twitterException) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+// }
+// catch(Exception e) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+// }
+// }
+ }
+
+ private void loadBatch(String[] ids) {
+
+ // twitter4j implementation below - replace with jInstagram
+//
+// Twitter client = getTwitterClient();
+// int keepTrying = 0;
+//
+// // keep trying to load, give it 5 attempts.
+// //while (keepTrying < 10)
+// while (keepTrying < 1)
+// {
+// try
+// {
+// for (User tStat : client.lookupUsers(ids)) {
+//
+// TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, tStat.getId());
+// executor.submit(providerTask);
+//
+// }
+// keepTrying = 10;
+// }
+// catch(TwitterException twitterException) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+// }
+// catch(Exception e) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+// }
+// }
+ }
+
+ public class InstagramTimelineProviderTask implements Runnable {
+
+ // twitter4j implementation below - replace with jInstagram
+
+ private final Logger LOGGER = LoggerFactory.getLogger(InstagramTimelineProvider.class);
+
+ private InstagramTimelineProvider provider;
+ private Instagram client;
+ private Long id;
+
+ public InstagramTimelineProviderTask(InstagramTimelineProvider provider, Instagram client, Long id) {
+ this.provider = provider;
+ this.client = client;
+ this.id = id;
+ }
+
+ @Override
+ public void run() {
+
+ // twitter4j implementation below - replace with jInstagram
+
+// Paging paging = new Paging(1, 200);
+// List<Status> statuses = null;
+// boolean KeepGoing = true;
+// boolean hadFailure = false;
+//
+// do
+// {
+// int keepTrying = 0;
+//
+// // keep trying to load, give it 5 attempts.
+// //This value was chosen because it seemed like a reasonable number of times
+// //to retry capturing a timeline given the sorts of errors that could potentially
+// //occur (network timeout/interruption, faulty client, etc.)
+// while (keepTrying < 5)
+// {
+//
+// try
+// {
+// statuses = client.getUserTimeline(id, paging);
+//
+// for (Status tStat : statuses)
+// {
+// String json = TwitterObjectFactory.getRawJSON(tStat);
+//
+// try {
+// provider.lock.readLock().lock();
+// ComponentUtils.offerUntilSuccess(new StreamsDatum(json), provider.providerQueue);
+// } finally {
+// provider.lock.readLock().unlock();
+// }
+// }
+//
+// paging.setPage(paging.getPage() + 1);
+//
+// keepTrying = 10;
+// }
+// catch(TwitterException twitterException) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, twitterException);
+// }
+// catch(Exception e) {
+// keepTrying += TwitterErrorHandler.handleTwitterError(client, e);
+// }
+// }
+// }
+// while (provider.shouldContinuePulling(statuses));
+
+ LOGGER.info(id + " Thread Finished");
+
+ }
+
+ }
+
+ private Map<Long, Long> userPullInfo;
+
+ protected boolean shouldContinuePulling(List<MediaFeedData> statuses) {
+ return (statuses != null) && (statuses.size() > 0);
+ }
+
+ private void sleep()
+ {
+ Thread.yield();
+ try {
+ // wait one tenth of a millisecond
+ Thread.yield();
+ Thread.sleep(1);
+ Thread.yield();
+ }
+ catch(IllegalArgumentException e) {
+ // passing in static values, this will never happen
+ }
+ catch(InterruptedException e) {
+ // noOp, there must have been an issue sleeping
+ }
+ Thread.yield();
+ }
+
+ public StreamsResultSet readCurrent() {
+
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
+ StreamsResultSet result;
+
+ try {
+ lock.writeLock().lock();
+ result = new StreamsResultSet(providerQueue);
+ result.setCounter(new DatumStatusCounter());
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ if( providerQueue.isEmpty() && executor.isTerminated()) {
+ LOGGER.info("Finished. Cleaning up...");
+
+ running.set(false);
+
+ LOGGER.info("Exiting");
+ }
+
+ return result;
+
+ }
+
+ protected Queue<StreamsDatum> constructQueue() {
+ return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(MAX_NUMBER_WAITING));
+ }
+
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+
+
+ @Override
+ public void prepare(Object o) {
+
+ executor = getExecutor();
+ running.set(true);
+ try {
+ lock.writeLock().lock();
+ providerQueue = constructQueue();
+ } finally {
+ lock.writeLock().unlock();
+ }
+
+ Preconditions.checkNotNull(providerQueue);
+
+ Preconditions.checkNotNull(this.klass);
+ Preconditions.checkNotNull(config.getAccessToken());
+
+ //idsCount = config.getFollow().size();
+
+ client = getInstagramClient();
+ }
+
+ protected Instagram getInstagramClient()
+ {
+ // twitter4j -> jInstagram
+// String baseUrl = "https://api.instagram.com:443/1.1/";
+//
+// ConfigurationBuilder builder = new ConfigurationBuilder()
+// .setOAuthConsumerKey(config.getOauth().getConsumerKey())
+// .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
+// .setOAuthAccessToken(config.getOauth().getAccessToken())
+// .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
+// .setIncludeEntitiesEnabled(includeEntitiesEnabled)
+// .setJSONStoreEnabled(jsonStoreEnabled)
+// .setAsyncNumThreads(3)
+// .setRestBaseURL(baseUrl)
+// .setIncludeMyRetweetEnabled(Boolean.TRUE)
+// .setPrettyDebugEnabled(Boolean.TRUE);
+//
+// return new InstagramFactory(builder.build()).getInstance();
+ return null;
+ }
+
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
new file mode 100644
index 0000000..8d92641
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class InstagramJsonActivitySerializer implements ActivitySerializer<String>, Serializable
+{
+
+ public InstagramJsonActivitySerializer() {
+
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public String serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(String serialized) throws ActivitySerializerException {
+
+ Activity activity = null;
+
+ // implement
+
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ throw new NotImplementedException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
new file mode 100644
index 0000000..e71c43e
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer.util;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.data.util.ActivityUtil.ensureExtensions;
+
+/**
+ * Provides utilities for working with Activity objects within the context of Instagram
+ */
+public class InstagramActivityUtil {
+
+ /**
+ * Updates the given Activity object with the values from the item
+ * @param item the object to use as the source
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @throws ActivitySerializerException
+ */
+ public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException {
+
+ }
+
+ /**
+ * Builds the actor
+ * @param item the item
+ * @return a valid Actor
+ */
+ public static Actor buildActor(MediaFeedData item) {
+ Actor actor = new Actor();
+ return actor;
+ }
+
+ /**
+ * Builds the ActivityObject
+ * @param item the item
+ * @return a valid Activity Object
+ */
+ public static ActivityObject buildActivityObject(MediaFeedData item) {
+ ActivityObject actObj = new ActivityObject();
+ return actObj;
+ }
+
+
+ /**
+ * Updates the content, and associated fields, with those from the given tweet
+ * @param activity the target of the updates. Will receive all values from the tweet.
+ * @param item the object to use as the source
+ * @param verb the verb for the given activity's type
+ */
+ public static void updateActivityContent(Activity activity, MediaFeedData item, String verb) {
+
+ }
+
+ /**
+ * Gets the links from the Instagram event
+ * @param item the object to use as the source
+ * @return a list of links corresponding to the expanded URL
+ */
+ public static List<String> getLinks(MediaFeedData item) {
+ List<String> links = Lists.newArrayList();
+ return links;
+ }
+
+ /**
+ * Adds the location extension and populates with teh instagram data
+ * @param activity the Activity object to update
+ * @param item the object to use as the source
+ */
+ public static void addLocationExtension(Activity activity, MediaFeedData item) {
+ Map<String, Object> extensions = ensureExtensions(activity);
+ Map<String, Object> location = new HashMap<String, Object>();
+
+ }
+
+ /**
+ * Gets the common instagram {@link org.apache.streams.pojo.json.Provider} object
+ * @return a provider object representing Instagram
+ */
+ public static Provider getProvider() {
+ Provider provider = new Provider();
+ provider.setId("id:providers:instagram");
+ provider.setDisplayName("Instagram");
+ return provider;
+ }
+ /**
+ * Adds the given Instagram event to the activity as an extension
+ * @param activity the Activity object to update
+ * @param event the Instagram event to add as the extension
+ */
+ public static void addInstagramExtension(Activity activity, ObjectNode event) {
+ Map<String, Object> extensions = org.apache.streams.data.util.ActivityUtil.ensureExtensions(activity);
+ extensions.put("instagram", event);
+ }
+ /**
+ * Formats the ID to conform with the Apache Streams activity ID convention
+ * @param idparts the parts of the ID to join
+ * @return a valid Activity ID in format "id:instagram:part1:part2:...partN"
+ */
+ public static String formatId(String... idparts) {
+ return Joiner.on(":").join(Lists.asList("id:instagram", idparts));
+ }
+
+ /**
+ * Takes various parameters from the instagram object that are currently not part of teh
+ * activity schema and stores them in a generic extensions attribute
+ * @param activity
+ * @param item
+ */
+ public static void addInstagramExtensions(Activity activity, MediaFeedData item) {
+ Map<String, Object> extensions = ensureExtensions(activity);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
new file mode 100644
index 0000000..18a59b9
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -0,0 +1,20 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "version": {
+ "type": "string",
+ "description": "The version"
+ },
+ "endpoint": {
+ "type": "string",
+ "description": "The endpoint"
+ },
+ "accessToken": {
+ "type": "string"
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
new file mode 100644
index 0000000..4b75ee4
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramUserInformationConfiguration.json
@@ -0,0 +1,17 @@
+{
+ "type": "object",
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "id": "#",
+ "javaType" : "org.apache.streams.instagram.InstagramUserInformationConfiguration",
+ "extends": {"$ref":"InstagramConfiguration.json"},
+ "javaInterfaces": ["java.io.Serializable"],
+ "properties": {
+ "info": {
+ "type": "array",
+ "description": "A list of user IDs, indicating the users whose posts should be delivered on the stream",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
new file mode 100644
index 0000000..9a01bf6
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
@@ -0,0 +1,5 @@
+instagram {
+ version = "v1"
+ endpoint = "sample"
+ accessToken = ""
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/537b97aa/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
new file mode 100644
index 0000000..fcf5e81
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/twitter/test/InstagramActivitySerDeTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.test;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.instagram.serializer.InstagramJsonActivitySerializer;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+/**
+* Created with IntelliJ IDEA.
+* User: sblackmon
+* Date: 8/20/13
+* Time: 5:57 PM
+* To change this template use File | Settings | File Templates.
+*/
+public class InstagramActivitySerDeTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeTest.class);
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ private InstagramJsonActivitySerializer instagramJsonActivitySerializer = new InstagramJsonActivitySerializer();
+
+ // remove @Ignore after implementation
+ @Ignore
+ @Test
+ public void Tests()
+ {
+ InputStream is = InstagramActivitySerDeTest.class.getResourceAsStream("/test.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ // convert to MediaFeedData?
+ Activity activity = instagramJsonActivitySerializer.deserialize(line);
+
+ String activitystring = mapper.writeValueAsString(activity);
+
+ LOGGER.info("activity: {}", activitystring);
+
+ assertThat(activity, is(not(nullValue())));
+
+ assertThat(activity.getId(), is(not(nullValue())));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getActor().getId(), is(not(nullValue())));
+ assertThat(activity.getVerb(), is(not(nullValue())));
+ assertThat(activity.getProvider(), is(not(nullValue())));
+
+ }
+ }
+ } catch( Exception e ) {
+ System.out.println(e);
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+}