You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/19 20:15:18 UTC
[2/6] incubator-streams git commit: level up instagram provider
level up instagram provider
update jInstagram version (STREAMS-413)
adopt Converter interfaces (STREAMS-276)
add real integration tests (STREAMS-415)
output ActivityObject (page) from UserInfo provider (STREAMS-422)
set objectType (STREAMS-233)
add main methods to each Provider (STREAMS-412)
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/73768bac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/73768bac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/73768bac
Branch: refs/heads/master
Commit: 73768bace304de08b98e85249b3c560499bc2310
Parents: 3234cdb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Oct 12 17:19:12 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Oct 12 17:19:29 2016 -0500
----------------------------------------------------------------------
.../streams-provider-instagram/pom.xml | 22 ++-
.../processor/InstagramTypeConverter.java | 58 +++----
.../instagram/provider/InstagramOauthToken.java | 2 +-
.../InstagramRecentMediaCollector.java | 2 +-
.../InstagramRecentMediaProvider.java | 62 +++++++
.../userinfo/InstagramUserInfoCollector.java | 4 +-
.../userinfo/InstagramUserInfoProvider.java | 68 ++++++++
.../InstagramJsonActivitySerializer.java | 78 ---------
.../InstagramMediaFeedDataConverter.java | 83 +++++++++
.../InstagramUserInfoDataConverter.java | 95 ++++++++++
.../serializer/InstagramUserInfoSerializer.java | 101 -----------
.../serializer/util/InstagramActivityUtil.java | 7 +-
.../serializer/util/InstagramDeserializer.java | 33 ----
.../src/main/resources/reference.conf | 2 -
.../InstagramRecentMediaCollectorTest.java | 156 -----------------
.../InstagramRecentMediaProviderTest.java | 174 -------------------
.../InstagramUserInfoCollectorTest.java | 120 -------------
.../test/data/InstagramActivitySerDeIT.java | 131 --------------
.../data/InstagramMediaFeedDataConverterIT.java | 109 ++++++++++++
.../data/InstagramUserInfoDataConverterIT.java | 119 +++++++++++++
.../InstagramRecentMediaProviderIT.java | 42 +++++
.../providers/InstagramUserInfoProviderIT.java | 42 +++++
.../InstagramRecentMediaProviderIT.conf | 14 ++
.../resources/InstagramUserInfoProviderIT.conf | 14 ++
24 files changed, 699 insertions(+), 839 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/pom.xml b/streams-contrib/streams-provider-instagram/pom.xml
index a6e78f5..d27e161 100644
--- a/streams-contrib/streams-provider-instagram/pom.xml
+++ b/streams-contrib/streams-provider-instagram/pom.xml
@@ -95,7 +95,7 @@
<dependency>
<groupId>com.sachinhandiekar</groupId>
<artifactId>jInstagram</artifactId>
- <version>1.0.7</version>
+ <version>1.1.8</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
@@ -198,12 +198,28 @@
<artifactId>maven-failsafe-plugin</artifactId>
<configuration>
<skipTests>${skipITs}</skipTests>
+ <!-- Run integration test suite rather than individual tests. -->
+ <excludes>
+ <exclude>**/*Test.java</exclude>
+ <exclude>**/*Tests.java</exclude>
+ </excludes>
+ <includes>
+ <exclude>**/*IT.java</exclude>
+ <include>**/*ITs.java</include>
+ </includes>
</configuration>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.surefire</groupId>
+ <artifactId>surefire-junit47</artifactId>
+ <version>${failsafe.plugin.version}</version>
+ </dependency>
+ </dependencies>
</plugin>
-<plugin>
+ <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
-</plugin>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 6595e8e..17af5f6 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -21,9 +21,11 @@ package org.apache.streams.instagram.processor;
import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.instagram.serializer.InstagramUserInfoSerializer;
+import org.apache.streams.instagram.serializer.InstagramMediaFeedDataConverter;
+import org.apache.streams.instagram.serializer.InstagramUserInfoDataConverter;
import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
import org.jinstagram.entity.users.basicinfo.UserInfoData;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.slf4j.Logger;
@@ -32,33 +34,21 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Queue;
+/**
+ * This is deprecated - use ActivityConverterProcessor or ActivityObjectConverterProcessor
+ */
+@Deprecated
public class InstagramTypeConverter implements StreamsProcessor {
public final static String STREAMS_ID = "InstagramTypeConverter";
private final static Logger LOGGER = LoggerFactory.getLogger(InstagramTypeConverter.class);
- private Queue<MediaFeedData> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private InstagramActivityUtil instagramActivityUtil;
- private InstagramUserInfoSerializer userInfoSerializer;
-
- private int count = 0;
+ private InstagramMediaFeedDataConverter mediaFeedDataConverter;
+ private InstagramUserInfoDataConverter userInfoDataConverter;
public final static String TERMINATE = new String("TERMINATE");
- public InstagramTypeConverter() {
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- public void setProcessorInputQueue(Queue<MediaFeedData> inputQueue) {
- inQueue = inputQueue;
- }
-
@Override
public String getId() {
return STREAMS_ID;
@@ -73,38 +63,38 @@ public class InstagramTypeConverter implements StreamsProcessor {
Object item = entry.getDocument();
LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
- Activity activity = null;
if(item instanceof MediaFeedData) {
+
//We don't need to use the mapper, since we have a process to convert between
//MediaFeedData objects and Activity objects already
- activity = new Activity();
-
- instagramActivityUtil.updateActivity((MediaFeedData)item, activity);
+ List<Activity> activity = mediaFeedDataConverter.toActivityList((MediaFeedData)item);
+ if( activity.size() > 0 ) result = new StreamsDatum(activity);
} else if(item instanceof UserInfoData) {
- activity = new Activity();
- instagramActivityUtil.updateActivity((UserInfoData) item, activity);
- }
- if(activity != null) {
- result = new StreamsDatum(activity);
- count++;
+
+ ActivityObject activityObject = userInfoDataConverter.toActivityObject((UserInfoData)item);
+
+ if( activityObject != null ) result = new StreamsDatum(activityObject);
+
}
+
} catch (Exception e) {
e.printStackTrace();
- LOGGER.error("Exception while converting MediaFeedData to Activity: {}", e.getMessage());
+ LOGGER.error("Exception while converting item: {}", e.getMessage());
}
- if( result != null )
+ if( result != null ) {
return Lists.newArrayList(result);
- else
+ } else
return Lists.newArrayList();
+
}
@Override
public void prepare(Object o) {
- instagramActivityUtil = new InstagramActivityUtil();
- this.userInfoSerializer = new InstagramUserInfoSerializer();
+ mediaFeedDataConverter = new InstagramMediaFeedDataConverter();
+ userInfoDataConverter = new InstagramUserInfoDataConverter();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index d395607..4531cfe 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -23,7 +23,6 @@ import org.jinstagram.auth.model.Token;
*/
public class InstagramOauthToken extends Token {
-
public InstagramOauthToken(String token) {
this(token, null);
}
@@ -40,4 +39,5 @@ public class InstagramOauthToken extends Token {
InstagramOauthToken that = (InstagramOauthToken) o;
return this.getToken().equals(that.getToken());
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
index 6bf71c3..e946e6b 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollector.java
@@ -69,7 +69,7 @@ public class InstagramRecentMediaCollector extends InstagramDataCollector<MediaF
MediaFeed feed = null;
try {
if (pagination == null) {
- feed = getNextInstagramClient().getRecentMediaFeed(Long.valueOf(user.getUserId()),
+ feed = getNextInstagramClient().getRecentMediaFeed(user.getUserId(),
0,
null,
null,
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
index ad039a8..4855ae8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProvider.java
@@ -14,9 +14,18 @@ specific language governing permissions and limitations
under the License. */
package org.apache.streams.instagram.provider.recentmedia;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
@@ -25,13 +34,22 @@ import org.apache.streams.instagram.*;
import org.apache.streams.instagram.provider.InstagramAbstractProvider;
import org.apache.streams.instagram.provider.InstagramDataCollector;
import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector;
+import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
import org.apache.streams.util.SerializationUtil;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.math.BigInteger;
import java.util.Collection;
+import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
@@ -45,6 +63,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class InstagramRecentMediaProvider extends InstagramAbstractProvider {
+ public static final String STREAMS_ID = "InstagramRecentMediaProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProvider.class);
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
public InstagramRecentMediaProvider() {
}
@@ -57,4 +80,43 @@ public class InstagramRecentMediaProvider extends InstagramAbstractProvider {
protected InstagramDataCollector getInstagramDataCollector() {
return new InstagramRecentMediaCollector(super.dataQueue, super.config);
}
+
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = conf.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ InstagramConfiguration config = new ComponentConfigurator<>(InstagramConfiguration.class).detectConfiguration(typesafe, "instagram");
+ InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(config);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
index dcdcb6d..0985ae8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
@@ -53,7 +53,7 @@ public class InstagramUserInfoCollector extends InstagramDataCollector<UserInfoD
while(!successful && attempt < MAX_ATTEMPTS) {
++attempt;
try {
- userInfo = getNextInstagramClient().getUserInfo(Long.valueOf(user.getUserId()));
+ userInfo = getNextInstagramClient().getUserInfo(user.getUserId());
} catch (Exception e) {
if(e instanceof InstagramRateLimitException) {
LOGGER.warn("Hit rate limit exception, backing off.");
@@ -85,7 +85,7 @@ public class InstagramUserInfoCollector extends InstagramDataCollector<UserInfoD
@Override
protected StreamsDatum convertToStreamsDatum(UserInfoData item) {
- return new StreamsDatum(item, Long.toString(item.getId()));
+ return new StreamsDatum(item, item.getId());
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
index 39c84a9..391b31c 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
@@ -14,9 +14,32 @@ specific language governing permissions and limitations
under the License. */
package org.apache.streams.instagram.provider.userinfo;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.InstagramUserInformationConfiguration;
import org.apache.streams.instagram.provider.InstagramAbstractProvider;
import org.apache.streams.instagram.provider.InstagramDataCollector;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
/**
* Instagram provider that pulls UserInfoData from Instagram
@@ -24,6 +47,12 @@ import org.apache.streams.instagram.provider.InstagramDataCollector;
*/
public class InstagramUserInfoProvider extends InstagramAbstractProvider {
+ public static final String STREAMS_ID = "InstagramUserInfoProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProvider.class);
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
public InstagramUserInfoProvider() {
super();
}
@@ -36,4 +65,43 @@ public class InstagramUserInfoProvider extends InstagramAbstractProvider {
protected InstagramDataCollector getInstagramDataCollector() {
return new InstagramUserInfoCollector(super.dataQueue, super.config);
}
+
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = conf.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ InstagramUserInformationConfiguration config = new ComponentConfigurator<>(InstagramUserInformationConfiguration.class).detectConfiguration(typesafe, "instagram");
+ InstagramUserInfoProvider provider = new InstagramUserInfoProvider(config);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
deleted file mode 100644
index c5bbdf1..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramJsonActivitySerializer.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.serializer;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.List;
-
-import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
-
-public class InstagramJsonActivitySerializer implements ActivitySerializer<String>, Serializable
-{
-
- public InstagramJsonActivitySerializer() {
-
- }
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public String serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException();
- }
-
- @Override
- public Activity deserialize(String serialized) throws ActivitySerializerException {
-
- ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- MediaFeedData mediaFeedData = null;
-
- try {
- mediaFeedData = mapper.readValue(serialized, MediaFeedData.class);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- Activity activity = new Activity();
-
- updateActivity(mediaFeedData, activity);
-
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- throw new NotImplementedException();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
new file mode 100644
index 0000000..0122248
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramMediaFeedDataConverter.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
+
+public class InstagramMediaFeedDataConverter implements ActivityConverter<MediaFeedData>, Serializable
+{
+
+ public static Class requiredClass = MediaFeedData.class;
+
+ public InstagramMediaFeedDataConverter() {
+
+ }
+
+ @Override
+ public Class requiredClass() {
+ return requiredClass;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public MediaFeedData fromActivity(Activity deserialized) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Activity> toActivityList(MediaFeedData item) throws ActivityConversionException {
+
+ Activity activity = new Activity();
+
+ updateActivity(item, activity);
+
+ return Lists.newArrayList(activity);
+ }
+
+ @Override
+ public List<MediaFeedData> fromActivityList(List<Activity> list) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public List<Activity> toActivityList(List<MediaFeedData> list) throws ActivityConversionException {
+ throw new NotImplementedException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoDataConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoDataConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoDataConverter.java
new file mode 100644
index 0000000..eee5674
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoDataConverter.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.serializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.ActivityObjectConverter;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivityConversionException;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.commons.lang.NotImplementedException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InstagramUserInfoDataConverter implements ActivityObjectConverter<UserInfoData> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoDataConverter.class);
+
+ private static final String STREAMS_ID_PREFIX = "id:instagram:";
+ private static final String PROVIDER_ID = "id:provider:instagram";
+ private static final String DISPLAY_NAME = "Instagram";
+
+ @Override
+ public Class requiredClass() {
+ return UserInfoData.class;
+ }
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public UserInfoData fromActivityObject(ActivityObject deserialized) throws ActivityConversionException {
+ return null;
+ }
+
+ @Override
+ public ActivityObject toActivityObject(UserInfoData serialized) throws ActivityConversionException {
+ ActivityObject activityObject = new ActivityObject();
+ activityObject.setObjectType("page");
+ Provider provider = new Provider();
+ provider.setId(PROVIDER_ID);
+ provider.setDisplayName(DISPLAY_NAME);
+ activityObject.getAdditionalProperties().put("provider", provider);
+ activityObject.setPublished(DateTime.now().withZone(DateTimeZone.UTC));
+ Image image = new Image();
+ image.setUrl(serialized.getProfilePicture());
+ activityObject.setImage(image);
+ activityObject.setId(STREAMS_ID_PREFIX+serialized.getId());
+ activityObject.setSummary(serialized.getBio());
+ activityObject.setAdditionalProperty("handle", serialized.getUsername());
+ activityObject.setDisplayName(serialized.getFullName());
+ activityObject.setUrl(serialized.getWebsite());
+ Map<String, Object> extensions = Maps.newHashMap();
+ activityObject.setAdditionalProperty("extensions", extensions);
+ extensions.put("screenName", serialized.getUsername());
+ extensions.put("posts", serialized.getCounts().getMedia());
+ extensions.put("followers", serialized.getCounts().getFollowedBy());
+ extensions.put("website", serialized.getWebsite());
+ extensions.put("following", serialized.getCounts().getFollows());
+ return activityObject;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
deleted file mode 100644
index f8d6ee1..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.serializer;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.instagram.UsersInfo;
-import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Image;
-import org.apache.streams.pojo.json.Provider;
-import org.jinstagram.entity.users.basicinfo.UserInfoData;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- *
- */
-public class InstagramUserInfoSerializer implements ActivitySerializer<UserInfoData> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoSerializer.class);
-
- private static final String STREAMS_ID_PREFIX = "id:instagram:";
- private static final String PROVIDER_ID = "id:provider:instagram";
- private static final String DISPLAY_NAME = "Instagram";
-
- @Override
- public String serializationFormat() {
- return null;
- }
-
- @Override
- public UserInfoData serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException();
- }
-
- @Override
- public Activity deserialize(UserInfoData serialized) throws ActivitySerializerException {
- Activity activity = new Activity();
- Provider provider = new Provider();
- provider.setId(PROVIDER_ID);
- provider.setDisplayName(DISPLAY_NAME);
- activity.setProvider(provider);
- activity.setPublished(DateTime.now().withZone(DateTimeZone.UTC));
- Actor actor = new Actor();
- Image image = new Image();
- image.setUrl(serialized.getProfile_picture());
- actor.setImage(image);
- actor.setId(STREAMS_ID_PREFIX+serialized.getId());
- actor.setSummary(serialized.getBio());
- actor.setAdditionalProperty("handle", serialized.getUsername());
- actor.setDisplayName(serialized.getFullName());
- Map<String, Object> extensions = Maps.newHashMap();
- actor.setAdditionalProperty("extensions", extensions);
- extensions.put("screenName", serialized.getUsername());
- extensions.put("posts", serialized.getCounts().getMedia());
- extensions.put("followers", serialized.getCounts().getFollwed_by());
- extensions.put("website", serialized.getWebsite());
- extensions.put("following", serialized.getCounts().getFollows());
- return activity;
- }
-
- @Override
- public List<Activity> deserializeAll(List<UserInfoData> serializedList) {
- List<Activity> result = Lists.newLinkedList();
- for(UserInfoData data : serializedList) {
- try {
- result.add(deserialize(data));
- } catch (ActivitySerializerException ase) {
- LOGGER.error("Caught ActivitySerializerException, dropping user info data : {}", data.getId());
- LOGGER.error("Exception : {}", ase);
- }
- }
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
index 9d48e3c..62aac93 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramActivityUtil.java
@@ -22,6 +22,7 @@ package org.apache.streams.instagram.serializer.util;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import org.apache.streams.exceptions.ActivityConversionException;
import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.pojo.extensions.ExtensionUtil;
import org.apache.streams.pojo.json.Activity;
@@ -60,7 +61,7 @@ public class InstagramActivityUtil {
* @param activity the target of the updates. Will receive all values from the tweet.
* @throws ActivitySerializerException
*/
- public static void updateActivity(MediaFeedData item, Activity activity) throws ActivitySerializerException {
+ public static void updateActivity(MediaFeedData item, Activity activity) throws ActivityConversionException {
activity.setActor(buildActor(item));
activity.setVerb("post");
@@ -104,13 +105,13 @@ public class InstagramActivityUtil {
try {
Image image = new Image();
- image.setUrl(item.getProfile_picture());
+ image.setUrl(item.getProfilePicture());
Counts counts = item.getCounts();
Map<String, Object> extensions = new HashMap<String, Object>();
- extensions.put("followers", counts.getFollwed_by());
+ extensions.put("followers", counts.getFollowedBy());
extensions.put("follows", counts.getFollows());
extensions.put("screenName", item.getUsername());
extensions.put("posts", counts.getMedia());
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramDeserializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramDeserializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramDeserializer.java
deleted file mode 100644
index d4280bf..0000000
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/util/InstagramDeserializer.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.instagram.serializer.util;
-
-import org.jinstagram.Instagram;
-import org.jinstagram.exceptions.InstagramException;
-
-public class InstagramDeserializer extends Instagram{
- public InstagramDeserializer(String test) {
- super(test);
- }
-
- @Override
- public <T> T createObjectFromResponse(Class<T> clazz, String response) throws InstagramException {
- return super.createObjectFromResponse(clazz, response);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
index 2e1f4b2..fe95199 100644
--- a/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-instagram/src/main/resources/reference.conf
@@ -2,6 +2,4 @@
# Licensed under Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0
instagram {
version = "v1"
- endpoint = "sample"
- accessToken = ""
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
deleted file mode 100644
index ce31ca7..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaCollectorTest.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider.recentmedia;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.common.Pagination;
-import org.jinstagram.entity.users.feed.MediaFeed;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.joda.time.DateTime;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for {@link org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaCollector}
- */
-public class InstagramRecentMediaCollectorTest extends RandomizedTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaCollectorTest.class);
-
- private int expectedDataCount = 0;
-
-
-
-
- @Test
- @Repeat(iterations = 3)
- public void testRun() {
- this.expectedDataCount = 0;
- Queue<StreamsDatum> data = Queues.newConcurrentLinkedQueue();
- InstagramConfiguration config = new InstagramConfiguration();
- UsersInfo usersInfo = new UsersInfo();
- config.setUsersInfo(usersInfo);
- Set<User> users = creatUsers(randomIntBetween(0, 100));
- usersInfo.setUsers(users);
-
- final Instagram mockInstagram = createMockInstagramClient();
- InstagramRecentMediaCollector collector = new InstagramRecentMediaCollector(data, config) {
- @Override
- protected Instagram getNextInstagramClient() {
- return mockInstagram;
- }
- };
- assertFalse(collector.isCompleted());
- collector.run();
- assertTrue(collector.isCompleted());
- assertEquals(this.expectedDataCount, data.size());
- }
-
- private Instagram createMockInstagramClient() {
- final Instagram instagramClient = mock(Instagram.class);
- try {
- final InstagramException mockException = mock(InstagramException.class);
- when(mockException.getRemainingLimitStatus()).thenReturn(-1);
- when(mockException.getMessage()).thenReturn("MockInstagramException message");
- when(instagramClient.getRecentMediaFeed(anyLong(), anyInt(), anyString(), anyString(), any(Date.class), any(Date.class))).thenAnswer(new Answer<MediaFeed>() {
- @Override
- public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
- if (randomInt(20) == 0) { //5% throw exceptions
- int type = randomInt(4);
- if (type == 0)
- throw mock(InstagramRateLimitException.class);
- else if (type == 1)
- throw mock(InstagramBadRequestException.class);
- else if (type == 2)
- throw mock(InstagramException.class);
- else
- throw new Exception();
- } else {
- return createRandomMockMediaFeed();
- }
- }
- });
- when(instagramClient.getRecentMediaNextPage(any(Pagination.class))).thenAnswer(new Answer<MediaFeed>() {
- @Override
- public MediaFeed answer(InvocationOnMock invocationOnMock) throws Throwable {
- return createRandomMockMediaFeed();
- }
- });
- } catch (InstagramException ie) {
- fail("Failed to create mock instagram client.");
- }
- return instagramClient;
- }
-
- private Set<User> creatUsers(int numUsers) {
- Set<User> users = Sets.newHashSet();
- for(int i=0; i < numUsers; ++i) {
- User user = new User();
- user.setUserId(Integer.toString(randomInt()));
- if(randomInt(2) == 0) {
- user.setAfterDate(DateTime.now().minusSeconds(randomIntBetween(0, 1000)));
- }
- if(randomInt(2) == 0) {
- user.setBeforeDate(DateTime.now());
- }
- users.add(user);
- }
- return users;
- }
-
- private MediaFeed createRandomMockMediaFeed() throws InstagramException {
- MediaFeed feed = mock(MediaFeed.class);
- when(feed.getData()).thenReturn(createData(randomInt(100)));
- Pagination pagination = mock(Pagination.class);
- if(randomInt(2) == 0) {
- when(pagination.hasNextPage()).thenReturn(true);
- } else {
- when(pagination.hasNextPage()).thenReturn(false);
- }
- when(feed.getPagination()).thenReturn(pagination);
- return feed;
- }
-
- private List<MediaFeedData> createData(int size) {
- List<MediaFeedData> data = Lists.newLinkedList();
- for(int i=0; i < size; ++i) {
- data.add(mock(MediaFeedData.class));
- }
- this.expectedDataCount += size;
- return data;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
deleted file mode 100644
index 371936b..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/recentmedia/InstagramRecentMediaProviderTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider.recentmedia;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.apache.streams.instagram.provider.InstagramDataCollector;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Random;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.mock;
-
-/**
- *
- */
-public class InstagramRecentMediaProviderTest {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramRecentMediaProviderTest.class);
-
- @Test
- public void testStartStream() throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- final InstagramRecentMediaCollector collectorStub = new InstagramRecentMediaCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration()) {
-
- private volatile boolean isFinished = false;
-
- @Override
- public void run() {
- this.isFinished = true;
- latch.countDown();
- }
-
- @Override
- public boolean isCompleted() {
- return this.isFinished;
- }
- };
-
- InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(null) {
- @Override
- protected InstagramDataCollector getInstagramDataCollector() {
- return collectorStub;
- }
- };
- provider.prepare(null);
- provider.startStream();
-
- latch.await();
- assertTrue(collectorStub.isCompleted());
- StreamsResultSet result = provider.readCurrent();
- assertNotNull(result);
- assertEquals(0, result.size());
- assertTrue(!provider.isRunning());
- try {
- provider.cleanUp();
- } catch (Throwable throwable){
- throwable.printStackTrace();
- fail("Error durring clean up");
- }
- }
-
- @Test
- public void testReadCurrent() {
- final long seed = System.nanoTime();
- final Random rand = new Random(seed);
- final CyclicBarrier test = new CyclicBarrier(2);
- final CyclicBarrier produce = new CyclicBarrier(2);
- final AtomicInteger batchCount = new AtomicInteger(0);
- final InstagramRecentMediaProvider provider = new InstagramRecentMediaProvider(createNonNullConfiguration()) {
- @Override
- protected InstagramDataCollector getInstagramDataCollector() {
- return new InstagramRecentMediaCollector(this.dataQueue, createNonNullConfiguration()) {
-
- private volatile boolean isFinished = false;
-
-
-
- public int getBatchCount() {
- return batchCount.get();
- }
-
- @Override
- public boolean isCompleted() {
- return isFinished;
- }
-
- @Override
- public void run() {
- int randInt = rand.nextInt(5);
- while(randInt != 0) {
- int batchSize = rand.nextInt(200);
- for(int i=0; i < batchSize; ++i) {
- while(!super.dataQueue.add(new StreamsDatum(null))) {
- Thread.yield();
- }
- }
- batchCount.set(batchSize);
- try {
- test.await();
- produce.await();
- } catch (InterruptedException ie ) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
- randInt = rand.nextInt(5);
- }
- batchCount.set(0);
- isFinished = true;
- try {
- test.await();
- produce.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
- }
-
- };
- }
- };
- provider.prepare(null);
- provider.startStream();
- while(provider.isRunning()) {
- try {
- test.await();
- assertEquals("Seed == "+seed, batchCount.get(), provider.readCurrent().size());
- produce.await();
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (BrokenBarrierException bbe) {
- Thread.currentThread().interrupt();
- }
-
- }
- }
-
- private InstagramConfiguration createNonNullConfiguration() {
- InstagramConfiguration configuration = new InstagramConfiguration();
- UsersInfo info = new UsersInfo();
- configuration.setUsersInfo(info);
- info.setUsers(new HashSet<User>());
- info.setAuthorizedTokens(new HashSet<String>());
- return configuration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
deleted file mode 100644
index 7399aff..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance *
-http://www.apache.org/licenses/LICENSE-2.0 *
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License. */
-package org.apache.streams.instagram.provider.userinfo;
-
-import com.carrotsearch.randomizedtesting.RandomizedTest;
-import com.carrotsearch.randomizedtesting.annotations.Repeat;
-import com.google.common.collect.Queues;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.User;
-import org.apache.streams.instagram.UsersInfo;
-import org.jinstagram.Instagram;
-import org.jinstagram.entity.users.basicinfo.UserInfo;
-import org.jinstagram.entity.users.basicinfo.UserInfoData;
-import org.jinstagram.exceptions.InstagramBadRequestException;
-import org.jinstagram.exceptions.InstagramException;
-import org.jinstagram.exceptions.InstagramRateLimitException;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Unit tests for {@link org.apache.streams.instagram.provider.userinfo.InstagramUserInfoCollector}
- */
-public class InstagramUserInfoCollectorTest extends RandomizedTest {
-
- private boolean succesful;
-
- @Test
- public void testConvertToStreamsDatum() {
- InstagramUserInfoCollector collector = new InstagramUserInfoCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration());
- UserInfoData userInfoData = mock(UserInfoData.class);
- long id = 1;
- when(userInfoData.getId()).thenReturn(id);
- StreamsDatum datum = collector.convertToStreamsDatum(userInfoData);
- assertNotNull(datum);
- assertEquals(userInfoData, datum.document);
- assertEquals(Long.toString(userInfoData.getId()), datum.getId());
- }
-
- @Test
- @Repeat(iterations = 3)
- public void testcollectInstagramDataForUser() {
- succesful = false;
- Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
- final Instagram client = createMockClient();
- InstagramUserInfoCollector collector = new InstagramUserInfoCollector(datums, createNonNullConfiguration()) {
- @Override
- protected Instagram getNextInstagramClient() {
- return client;
- }
- };
- if(succesful) {
- assertEquals(1, datums.size());
- } else {
- assertEquals(0, datums.size());
- }
- }
-
- private Instagram createMockClient() {
- Instagram client = mock(Instagram.class);
- try {
- when(client.getUserInfo(anyLong())).thenAnswer(new Answer<UserInfo>() {
- @Override
- public UserInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
- int exception = randomInt(10);
- if (exception == 0) {
- throw mock(InstagramRateLimitException.class);
- } else if (exception == 1) {
- throw mock(InstagramBadRequestException.class);
- } else if (exception == 2) {
- throw mock(InstagramException.class);
- } else {
- UserInfo info = mock(UserInfo.class);
- UserInfoData data = mock(UserInfoData.class);
- when(data.getId()).thenReturn(randomLong());
- when(info.getData()).thenReturn(data);
- succesful = true;
- return info;
- }
- }
- });
- } catch (InstagramException e) {
- fail("Never should throw exception creating mock instagram object");
- }
- return client;
- }
-
- private InstagramConfiguration createNonNullConfiguration() {
- InstagramConfiguration configuration = new InstagramConfiguration();
- UsersInfo info = new UsersInfo();
- configuration.setUsersInfo(info);
- info.setUsers(new HashSet<User>());
- info.setAuthorizedTokens(new HashSet<String>());
- return configuration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramActivitySerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramActivitySerDeIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramActivitySerDeIT.java
deleted file mode 100644
index 6d1bebd..0000000
--- a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramActivitySerDeIT.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.test;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.streams.instagram.serializer.util.InstagramDeserializer;
-import org.apache.streams.pojo.json.Activity;
-import org.jinstagram.entity.users.basicinfo.UserInfoData;
-import org.jinstagram.entity.users.feed.MediaFeedData;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Map;
-
-import static org.apache.streams.instagram.serializer.util.InstagramActivityUtil.updateActivity;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-/**
- * Tests conversion of instagram inputs to Activity
- */
-public class InstagramActivitySerDeIT {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(InstagramActivitySerDeIT.class);
-
- @Test
- public void TestMediaFeedObjects() {
- InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
- InputStream is = InstagramActivitySerDeIT.class.getResourceAsStream("/testMediaFeedObjects.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- MediaFeedData mediaFeedData = instagramDeserializer.createObjectFromResponse(MediaFeedData.class, line);
-
- Activity activity = new Activity();
-
- LOGGER.info("activity: {}", activity.toString());
-
- updateActivity(mediaFeedData, activity);
- assertThat(activity, is(not(nullValue())));
-
- assertThat(activity.getId(), is(not(nullValue())));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getProvider(), is(not(nullValue())));
- }
- }
- } catch( Exception e ) {
- LOGGER.error("Exception: ", e);
- Assert.fail();
- }
- }
-
- @Test
- public void TestUserInfoData() {
- InstagramDeserializer instagramDeserializer = new InstagramDeserializer("");
- InputStream is = InstagramActivitySerDeIT.class.getResourceAsStream("/testUserInfoData.txt");
- InputStreamReader isr = new InputStreamReader(is);
- BufferedReader br = new BufferedReader(isr);
-
- try {
- while (br.ready()) {
- String line = br.readLine();
- if(!StringUtils.isEmpty(line))
- {
- LOGGER.info("raw: {}", line);
-
- UserInfoData userInfoData = instagramDeserializer.createObjectFromResponse(UserInfoData.class, line);
-
- Activity activity = new Activity();
-
- LOGGER.info("activity: {}", activity.toString());
-
- updateActivity(userInfoData, activity);
- assertThat(activity, is(not(nullValue())));
-
- assertThat(activity.getId(), is(nullValue()));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getActor().getImage(), is(not(nullValue())));
- assertThat(activity.getActor().getDisplayName(), is(not(nullValue())));
- assertThat(activity.getActor().getSummary(), is(not(nullValue())));
-
- Map<String, Object> extensions = (Map<String, Object>)activity.getActor().getAdditionalProperties().get("extensions");
- assertThat(extensions, is(not(nullValue())));
- assertThat(extensions.get("follows"), is(not(nullValue())));
- assertThat(extensions.get("followers"), is(not(nullValue())));
- assertThat(extensions.get("screenName"), is(not(nullValue())));
- assertThat(extensions.get("posts"), is(not(nullValue())));
-
- assertThat(activity.getActor().getAdditionalProperties().get("handle"), is(not(nullValue())));
- assertThat(activity.getActor().getId(), is(not(nullValue())));
- assertThat(activity.getActor().getUrl(), is(not(nullValue())));
- assertThat(activity.getVerb(), is(not(nullValue())));
- assertThat(activity.getProvider(), is(not(nullValue())));
- }
- }
- } catch( Exception e ) {
- LOGGER.error("Exception: ", e);
- Assert.fail();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramMediaFeedDataConverterIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramMediaFeedDataConverterIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramMediaFeedDataConverterIT.java
new file mode 100644
index 0000000..a54cfdb
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramMediaFeedDataConverterIT.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.test.data;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityObjectConverter;
+import org.apache.streams.instagram.serializer.InstagramMediaFeedDataConverter;
+import org.apache.streams.instagram.serializer.InstagramUserInfoDataConverter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests conversion of instagram inputs to Activity
+ */
+public class InstagramMediaFeedDataConverterIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramMediaFeedDataConverterIT.class);
+
+ // use gson because jInstagram's pojos do
+ private Gson gson = new Gson();
+
+ // use jackson to write to file output
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Test
+ public void InstagramMediaFeedDataConverterIT() throws Exception {
+ InputStream is = InstagramMediaFeedDataConverterIT.class.getResourceAsStream("/testMediaFeedObjects.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream("target/test-classes/InstagramMediaFeedDataConverterIT.txt")));
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ MediaFeedData mediaFeedData = gson.fromJson(line, MediaFeedData.class);
+
+ ActivityConverter<MediaFeedData> converter = new InstagramMediaFeedDataConverter();
+
+ Activity activity = converter.toActivityList(mediaFeedData).get(0);
+
+ LOGGER.info("activity: {}", activity.toString());
+
+ assertThat(activity, is(not(nullValue())));
+
+ assertThat(activity.getId(), is(not(nullValue())));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getActor().getId(), is(not(nullValue())));
+ assertThat(activity.getVerb(), is(not(nullValue())));
+ assertThat(activity.getProvider(), is(not(nullValue())));
+
+ outStream.println(mapper.writeValueAsString(activity));
+
+ }
+
+ }
+ outStream.flush();
+
+ } catch( Exception e ) {
+ LOGGER.error("Exception: ", e);
+ outStream.flush();
+ Assert.fail();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramUserInfoDataConverterIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramUserInfoDataConverterIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramUserInfoDataConverterIT.java
new file mode 100644
index 0000000..dcc5eb1
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/data/InstagramUserInfoDataConverterIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.instagram.test.data;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import org.apache.commons.lang.StringUtils;
+import org.apache.streams.data.ActivityConverter;
+import org.apache.streams.data.ActivityObjectConverter;
+import org.apache.streams.instagram.serializer.InstagramMediaFeedDataConverter;
+import org.apache.streams.instagram.serializer.InstagramUserInfoDataConverter;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.entity.users.feed.MediaFeedData;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests conversion of instagram inputs to Activity
+ */
+public class InstagramUserInfoDataConverterIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoDataConverterIT.class);
+
+ // use gson because jInstagram's pojos do
+ private Gson gson = new Gson();
+
+ // use jackson to write to file output
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Test
+ public void InstagramUserInfoDataConverterIT() throws Exception {
+ InputStream is = InstagramUserInfoDataConverterIT.class.getResourceAsStream("/testUserInfoData.txt");
+ InputStreamReader isr = new InputStreamReader(is);
+ BufferedReader br = new BufferedReader(isr);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream("target/test-classes/InstagramUserInfoDataConverterIT.txt")));
+
+ try {
+ while (br.ready()) {
+ String line = br.readLine();
+ if(!StringUtils.isEmpty(line))
+ {
+ LOGGER.info("raw: {}", line);
+
+ UserInfoData userInfoData = gson.fromJson(line, UserInfoData.class);
+
+ ActivityObjectConverter<UserInfoData> converter = new InstagramUserInfoDataConverter();
+
+ ActivityObject activityObject = converter.toActivityObject(userInfoData);
+
+ LOGGER.info("activityObject: {}", activityObject.toString());
+
+ assertThat(activityObject, is(not(nullValue())));
+
+ assertThat(activityObject.getId(), is(not(nullValue())));
+ assertThat(activityObject.getImage(), is(not(nullValue())));
+ assertThat(activityObject.getDisplayName(), is(not(nullValue())));
+ assertThat(activityObject.getSummary(), is(not(nullValue())));
+
+ Map<String, Object> extensions = (Map<String, Object>)activityObject.getAdditionalProperties().get("extensions");
+ assertThat(extensions, is(not(nullValue())));
+ assertThat(extensions.get("following"), is(not(nullValue())));
+ assertThat(extensions.get("followers"), is(not(nullValue())));
+ assertThat(extensions.get("screenName"), is(not(nullValue())));
+ assertThat(extensions.get("posts"), is(not(nullValue())));
+
+ assertThat(activityObject.getAdditionalProperties().get("handle"), is(not(nullValue())));
+ assertThat(activityObject.getId(), is(not(nullValue())));
+ assertThat(activityObject.getUrl(), is(not(nullValue())));
+
+ assertThat(activityObject.getAdditionalProperties().get("provider"), is(not(nullValue())));
+
+ outStream.println(mapper.writeValueAsString(activityObject));
+
+ }
+ }
+ outStream.flush();
+
+ } catch( Exception e ) {
+ LOGGER.error("Exception: ", e);
+ outStream.flush();
+ Assert.fail();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
new file mode 100644
index 0000000..8f2b19b
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramRecentMediaProviderIT.java
@@ -0,0 +1,42 @@
+package org.apache.streams.instagram.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaProvider;
+import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Created by sblackmon on 10/12/16.
+ */
+public class InstagramRecentMediaProviderIT {
+
+ @Test
+ public void testInstagramRecentMediaProvider() throws Exception {
+
+ String configfile = "./target/test-classes/InstagramRecentMediaProviderIT.conf";
+ String outfile = "./target/test-classes/InstagramRecentMediaProviderIT.stdout.txt";
+
+ String[] args = new String[2];
+ args[0] = configfile;
+ args[1] = outfile;
+
+ InstagramRecentMediaProvider.main(args);
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while (outCounter.readLine() != null) {
+ }
+
+ assert (outCounter.getLineNumber() >= 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
new file mode 100644
index 0000000..04e7c89
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/test/providers/InstagramUserInfoProviderIT.java
@@ -0,0 +1,42 @@
+package org.apache.streams.instagram.test.providers;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.instagram.provider.recentmedia.InstagramRecentMediaProvider;
+import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Created by sblackmon on 10/12/16.
+ */
+public class InstagramUserInfoProviderIT {
+
+ @Test
+ public void testInstagramUserInfoProvider() throws Exception {
+
+ String configfile = "./target/test-classes/InstagramUserInfoProviderIT.conf";
+ String outfile = "./target/test-classes/InstagramUserInfoProviderIT.stdout.txt";
+
+ String[] args = new String[2];
+ args[0] = configfile;
+ args[1] = outfile;
+
+ InstagramUserInfoProvider.main(args);
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() >= 1);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/73768bac/streams-contrib/streams-provider-instagram/src/test/resources/InstagramRecentMediaProviderIT.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/resources/InstagramRecentMediaProviderIT.conf b/streams-contrib/streams-provider-instagram/src/test/resources/InstagramRecentMediaProviderIT.conf
new file mode 100644
index 0000000..420deb5
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/resources/InstagramRecentMediaProviderIT.conf
@@ -0,0 +1,14 @@
+instagram {
+ clientId = "b389fcbeca2a40a89afa591a8468e4dc"
+ usersInfo = {
+ authorizedTokens = [
+ "1646021441.b739937.58d5b84abce74241b640d8c1f7e91222"
+ ]
+ users = [
+ {
+ #userId = "blackmonsteve"
+ userId = "1646021441"
+ }
+ ]
+ }
+}
\ No newline at end of file