You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/09/04 19:12:06 UTC
[18/41] git commit: STREAMS-145 | Created type converter for
UserInfoData
STREAMS-145 | Created type converter for UserInfoData
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b85dfa71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b85dfa71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b85dfa71
Branch: refs/heads/pp
Commit: b85dfa71f1a86fcbe16d643c51a4647c967b8a84
Parents: c2a391a
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 15:16:47 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 15:16:47 2014 -0500
----------------------------------------------------------------------
.../processor/InstagramTypeConverter.java | 19 ++-
.../provider/InstagramAbstractProvider.java | 2 +-
.../userinfo/InstagramUserInfoCollector.java | 91 ++++++++++++++
.../userinfo/InstagramUserInfoProvider.java | 58 ++-------
.../serializer/InstagramUserInfoSerializer.java | 81 +++++++++++++
.../InstagramUserInfoCollectorTest.java | 120 +++++++++++++++++++
6 files changed, 313 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 7fb1ec6..90ec3a8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -21,8 +21,10 @@ package org.apache.streams.instagram.processor;
import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.instagram.serializer.InstagramUserInfoSerializer;
import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +42,7 @@ public class InstagramTypeConverter implements StreamsProcessor {
private Queue<StreamsDatum> outQueue;
private InstagramActivityUtil instagramActivityUtil;
+ private InstagramUserInfoSerializer userInfoSerializer;
private int count = 0;
@@ -65,18 +68,21 @@ public class InstagramTypeConverter implements StreamsProcessor {
Object item = entry.getDocument();
LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
+ Activity activity = null;
if(item instanceof MediaFeedData) {
//We don't need to use the mapper, since we have a process to convert between
//MediaFeedData objects and Activity objects already
- Activity activity = new Activity();
+ activity = new Activity();
instagramActivityUtil.updateActivity((MediaFeedData)item, activity);
- if(activity.getId() != null) {
- result = new StreamsDatum(activity);
- count++;
- }
+
+ } else if(item instanceof UserInfoData) {
+ activity = this.userInfoSerializer.deserialize((UserInfoData) item );
+ }
+ if(activity != null && activity.getId() != null) {
+ result = new StreamsDatum(activity);
+ count++;
}
} catch (Exception e) {
e.printStackTrace();
@@ -92,6 +98,7 @@ public class InstagramTypeConverter implements StreamsProcessor {
@Override
public void prepare(Object o) {
instagramActivityUtil = new InstagramActivityUtil();
+ this.userInfoSerializer = new InstagramUserInfoSerializer();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index c34014c..9eb6e6f 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
*/
-public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
+public abstract class InstagramAbstractProvider implements StreamsProvider {
protected InstagramConfiguration config;
private InstagramDataCollector dataCollector;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
new file mode 100644
index 0000000..d9be9b0
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
+import org.jinstagram.entity.users.basicinfo.UserInfo;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public class InstagramUserInfoCollector extends InstagramDataCollector<UserInfoData>{
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoCollector.class);
+ protected static final int MAX_ATTEMPTS = 5;
+
+ private int consecutiveErrorCount;
+
+ public InstagramUserInfoCollector(Queue<StreamsDatum> dataQueue, InstagramConfiguration config) {
+ super(dataQueue, config);
+ this.consecutiveErrorCount = 0;
+ }
+
+ @Override
+ protected void collectInstagramDataForUser(User user) throws Exception {
+ int attempt = 0;
+ boolean successful = false;
+ UserInfo userInfo = null;
+ while(!successful && attempt < MAX_ATTEMPTS) {
+ ++attempt;
+ try {
+ userInfo = getNextInstagramClient().getUserInfo(Long.valueOf(user.getUserId()));
+ } catch (Exception e) {
+ if(e instanceof InstagramRateLimitException) {
+ LOGGER.warn("Hit rate limit exception, backing off.");
+ super.backOffStrategy.backOff();
+ } else if(e instanceof InstagramBadRequestException) {
+ LOGGER.error("Sent a bad request to Instagram, skipping user : {}", user.getUserId());
+ attempt = MAX_ATTEMPTS;
+ ++this.consecutiveErrorCount;
+ } else {
+ LOGGER.error("Expection while polling instagram : {}", e);
+ ++this.consecutiveErrorCount;
+ }
+ if(this.consecutiveErrorCount >= Math.max(super.numAvailableTokens(), MAX_ATTEMPTS * 2)) {
+ LOGGER.error("Consecutive Errors above acceptable limits, ending collection of data.");
+ throw new Exception("Consecutive Errors above acceptable limits : "+this.consecutiveErrorCount);
+ }
+ }
+ if(successful = (userInfo != null)) {
+ this.consecutiveErrorCount = 0;
+ List<UserInfoData> data = Lists.newLinkedList();
+ data.add(userInfo.getData());
+ super.queueData(data, user.getUserId());
+ }
+ }
+ if(attempt == MAX_ATTEMPTS) {
+ LOGGER.error("Failed to collect data for user : {}", user.getUserId());
+ }
+ }
+
+ @Override
+ protected StreamsDatum convertToStreamsDatum(UserInfoData item) {
+ return new StreamsDatum(item, Long.toString(item.getId()));
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
index 3bae50f..b873e9c 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
@@ -14,69 +14,25 @@ specific language governing permissions and limitations
under the License. */
package org.apache.streams.instagram.provider.userinfo;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.util.SerializationUtil;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.streams.instagram.provider.InstagramAbstractProvider;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
/**
*
*/
-public class InstagramUserInfoProvider implements StreamsProvider {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProvider.class);
-
- private InstagramConfiguration config;
- private AtomicBoolean isComplete;
+public class InstagramUserInfoProvider extends InstagramAbstractProvider {
public InstagramUserInfoProvider() {
- this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+ super();
}
public InstagramUserInfoProvider(InstagramConfiguration config) {
- this.config = SerializationUtil.cloneBySerialization(config);
- }
-
- @Override
- public void startStream() {
-
- }
-
- @Override
- public StreamsResultSet readCurrent() {
- return null;
+ super(config);
}
@Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return this.isComplete.get();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- this.isComplete = new AtomicBoolean(false);
- }
-
- @Override
- public void cleanUp() {
-
+ protected InstagramDataCollector getInstagramDataCollector() {
+ return new InstagramUserInfoCollector(super.dataQueue, super.config);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
new file mode 100644
index 0000000..9acea20
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
@@ -0,0 +1,81 @@
+package org.apache.streams.instagram.serializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InstagramUserInfoSerializer implements ActivitySerializer<UserInfoData> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoSerializer.class);
+
+ private static final String STREAMS_ID_PREFIX = "id:instagram:";
+ private static final String PROVIDER_ID = "id:provider:instagram";
+ private static final String DISPLAY_NAME = "Instagram";
+
+ @Override
+ public String serializationFormat() {
+ return null;
+ }
+
+ @Override
+ public UserInfoData serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public Activity deserialize(UserInfoData serialized) throws ActivitySerializerException {
+ Activity activity = new Activity();
+ Provider provider = new Provider();
+ provider.setId(PROVIDER_ID);
+ provider.setDisplayName(DISPLAY_NAME);
+ activity.setProvider(provider);
+
+ Actor actor = new Actor();
+ Image image = new Image();
+ image.setUrl(serialized.getProfile_picture());
+ actor.setImage(image);
+ actor.setId(STREAMS_ID_PREFIX+serialized.getId());
+ actor.setSummary(serialized.getBio());
+ actor.setAdditionalProperty("handle", serialized.getUsername());
+ actor.setDisplayName(serialized.getFullName());
+ Map<String, Object> extensions = Maps.newHashMap();
+ actor.setAdditionalProperty("extensions", extensions);
+ extensions.put("screenName", serialized.getUsername());
+ extensions.put("posts", serialized.getCounts().getMedia());
+ extensions.put("followers", serialized.getCounts().getFollwed_by());
+ extensions.put("website", serialized.getWebsite());
+ extensions.put("following", serialized.getCounts().getFollows());
+ return activity;
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<UserInfoData> serializedList) {
+ List<Activity> result = Lists.newLinkedList();
+ for(UserInfoData data : serializedList) {
+ try {
+ result.add(deserialize(data));
+ } catch (ActivitySerializerException ase) {
+ LOGGER.error("Caught ActivitySerializerException, dropping user info data : {}", data.getId());
+ LOGGER.error("Exception : {}", ase);
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
new file mode 100644
index 0000000..7399aff
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.basicinfo.UserInfo;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link org.apache.streams.instagram.provider.userinfo.InstagramUserInfoCollector}
+ */
+public class InstagramUserInfoCollectorTest extends RandomizedTest {
+
+ private boolean succesful;
+
+ @Test
+ public void testConvertToStreamsDatum() {
+ InstagramUserInfoCollector collector = new InstagramUserInfoCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration());
+ UserInfoData userInfoData = mock(UserInfoData.class);
+ long id = 1;
+ when(userInfoData.getId()).thenReturn(id);
+ StreamsDatum datum = collector.convertToStreamsDatum(userInfoData);
+ assertNotNull(datum);
+ assertEquals(userInfoData, datum.document);
+ assertEquals(Long.toString(userInfoData.getId()), datum.getId());
+ }
+
+ @Test
+ @Repeat(iterations = 3)
+ public void testcollectInstagramDataForUser() {
+ succesful = false;
+ Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+ final Instagram client = createMockClient();
+ InstagramUserInfoCollector collector = new InstagramUserInfoCollector(datums, createNonNullConfiguration()) {
+ @Override
+ protected Instagram getNextInstagramClient() {
+ return client;
+ }
+ };
+ if(succesful) {
+ assertEquals(1, datums.size());
+ } else {
+ assertEquals(0, datums.size());
+ }
+ }
+
+ private Instagram createMockClient() {
+ Instagram client = mock(Instagram.class);
+ try {
+ when(client.getUserInfo(anyLong())).thenAnswer(new Answer<UserInfo>() {
+ @Override
+ public UserInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
+ int exception = randomInt(10);
+ if (exception == 0) {
+ throw mock(InstagramRateLimitException.class);
+ } else if (exception == 1) {
+ throw mock(InstagramBadRequestException.class);
+ } else if (exception == 2) {
+ throw mock(InstagramException.class);
+ } else {
+ UserInfo info = mock(UserInfo.class);
+ UserInfoData data = mock(UserInfoData.class);
+ when(data.getId()).thenReturn(randomLong());
+ when(info.getData()).thenReturn(data);
+ succesful = true;
+ return info;
+ }
+ }
+ });
+ } catch (InstagramException e) {
+ fail("Never should throw exception creating mock instagram object");
+ }
+ return client;
+ }
+
+ private InstagramConfiguration createNonNullConfiguration() {
+ InstagramConfiguration configuration = new InstagramConfiguration();
+ UsersInfo info = new UsersInfo();
+ configuration.setUsersInfo(info);
+ info.setUsers(new HashSet<User>());
+ info.setAuthorizedTokens(new HashSet<String>());
+ return configuration;
+ }
+
+}