You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/09/04 19:12:06 UTC

[18/41] git commit: STREAMS-145 | Created type converter for UserInfoData

STREAMS-145 | Created type converter for UserInfoData


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b85dfa71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b85dfa71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b85dfa71

Branch: refs/heads/pp
Commit: b85dfa71f1a86fcbe16d643c51a4647c967b8a84
Parents: c2a391a
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Thu Aug 14 15:16:47 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Thu Aug 14 15:16:47 2014 -0500

----------------------------------------------------------------------
 .../processor/InstagramTypeConverter.java       |  19 ++-
 .../provider/InstagramAbstractProvider.java     |   2 +-
 .../userinfo/InstagramUserInfoCollector.java    |  91 ++++++++++++++
 .../userinfo/InstagramUserInfoProvider.java     |  58 ++-------
 .../serializer/InstagramUserInfoSerializer.java |  81 +++++++++++++
 .../InstagramUserInfoCollectorTest.java         | 120 +++++++++++++++++++
 6 files changed, 313 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
index 7fb1ec6..90ec3a8 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/processor/InstagramTypeConverter.java
@@ -21,8 +21,10 @@ package org.apache.streams.instagram.processor;
 import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.instagram.serializer.InstagramUserInfoSerializer;
 import org.apache.streams.instagram.serializer.util.InstagramActivityUtil;
 import org.apache.streams.pojo.json.Activity;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +42,7 @@ public class InstagramTypeConverter implements StreamsProcessor {
     private Queue<StreamsDatum> outQueue;
 
     private InstagramActivityUtil instagramActivityUtil;
+    private InstagramUserInfoSerializer userInfoSerializer;
 
     private int count = 0;
 
@@ -65,18 +68,21 @@ public class InstagramTypeConverter implements StreamsProcessor {
             Object item = entry.getDocument();
 
             LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
+            Activity activity = null;
             if(item instanceof MediaFeedData) {
                 //We don't need to use the mapper, since we have a process to convert between
                 //MediaFeedData objects and Activity objects already
-                Activity activity = new Activity();
+                activity = new Activity();
 
                 instagramActivityUtil.updateActivity((MediaFeedData)item, activity);
 
-                if(activity.getId() != null) {
-                    result = new StreamsDatum(activity);
-                    count++;
-                }
+
+            } else if(item instanceof UserInfoData) {
+                activity = this.userInfoSerializer.deserialize((UserInfoData) item );
+            }
+            if(activity != null && activity.getId() != null) {
+                result = new StreamsDatum(activity);
+                count++;
             }
         } catch (Exception e) {
             e.printStackTrace();
@@ -92,6 +98,7 @@ public class InstagramTypeConverter implements StreamsProcessor {
     @Override
     public void prepare(Object o) {
         instagramActivityUtil = new InstagramActivityUtil();
+        this.userInfoSerializer = new InstagramUserInfoSerializer();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
index c34014c..9eb6e6f 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramAbstractProvider.java
@@ -41,7 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 /**
  * Instagram {@link org.apache.streams.core.StreamsProvider} that provides the recent media data for a group of users
  */
-public abstract class InstagramAbstractProvider<T> implements StreamsProvider {
+public abstract class InstagramAbstractProvider implements StreamsProvider {
 
     protected InstagramConfiguration config;
     private InstagramDataCollector dataCollector;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
new file mode 100644
index 0000000..d9be9b0
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollector.java
@@ -0,0 +1,91 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import com.google.common.collect.Lists;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
+import org.jinstagram.entity.users.basicinfo.UserInfo;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Queue;
+
+/**
+ *
+ */
+public class InstagramUserInfoCollector extends InstagramDataCollector<UserInfoData>{
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoCollector.class);
+    protected static final int MAX_ATTEMPTS = 5;
+
+    private int consecutiveErrorCount;
+
+    public InstagramUserInfoCollector(Queue<StreamsDatum> dataQueue, InstagramConfiguration config) {
+        super(dataQueue, config);
+        this.consecutiveErrorCount = 0;
+    }
+
+    @Override
+    protected void collectInstagramDataForUser(User user) throws Exception {
+        int attempt = 0;
+        boolean successful = false;
+        UserInfo userInfo = null;
+        while(!successful && attempt < MAX_ATTEMPTS) {
+            ++attempt;
+            try {
+                userInfo = getNextInstagramClient().getUserInfo(Long.valueOf(user.getUserId()));
+            } catch (Exception e) {
+                if(e instanceof InstagramRateLimitException) {
+                    LOGGER.warn("Hit rate limit exception, backing off.");
+                    super.backOffStrategy.backOff();
+                } else if(e instanceof InstagramBadRequestException) {
+                    LOGGER.error("Sent a bad request to Instagram, skipping user : {}", user.getUserId());
+                    attempt = MAX_ATTEMPTS;
+                    ++this.consecutiveErrorCount;
+                } else {
+                    LOGGER.error("Expection while polling instagram : {}", e);
+                    ++this.consecutiveErrorCount;
+                }
+                if(this.consecutiveErrorCount >= Math.max(super.numAvailableTokens(), MAX_ATTEMPTS * 2)) {
+                    LOGGER.error("Consecutive Errors above acceptable limits, ending collection of data.");
+                    throw new Exception("Consecutive Errors above acceptable limits : "+this.consecutiveErrorCount);
+                }
+            }
+            if(successful = (userInfo != null)) {
+                this.consecutiveErrorCount = 0;
+                List<UserInfoData> data = Lists.newLinkedList();
+                data.add(userInfo.getData());
+                super.queueData(data, user.getUserId());
+            }
+        }
+        if(attempt == MAX_ATTEMPTS) {
+            LOGGER.error("Failed to collect data for user : {}", user.getUserId());
+        }
+    }
+
+    @Override
+    protected StreamsDatum convertToStreamsDatum(UserInfoData item) {
+        return new StreamsDatum(item, Long.toString(item.getId()));
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
index 3bae50f..b873e9c 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoProvider.java
@@ -14,69 +14,25 @@ specific language governing permissions and limitations
 under the License. */
 package org.apache.streams.instagram.provider.userinfo;
 
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.InstagramConfiguration;
-import org.apache.streams.instagram.InstagramConfigurator;
-import org.apache.streams.util.SerializationUtil;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.streams.instagram.provider.InstagramAbstractProvider;
+import org.apache.streams.instagram.provider.InstagramDataCollector;
 
 /**
  *
  */
-public class InstagramUserInfoProvider implements StreamsProvider {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoProvider.class);
-
-    private InstagramConfiguration config;
-    private AtomicBoolean isComplete;
+public class InstagramUserInfoProvider extends InstagramAbstractProvider {
 
     public InstagramUserInfoProvider() {
-        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
+        super();
     }
 
     public InstagramUserInfoProvider(InstagramConfiguration config) {
-        this.config = SerializationUtil.cloneBySerialization(config);
-    }
-
-    @Override
-    public void startStream() {
-
-    }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-        return null;
+        super(config);
     }
 
     @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return this.isComplete.get();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        this.isComplete = new AtomicBoolean(false);
-    }
-
-    @Override
-    public void cleanUp() {
-
+    protected InstagramDataCollector getInstagramDataCollector() {
+        return new InstagramUserInfoCollector(super.dataQueue, super.config);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
new file mode 100644
index 0000000..9acea20
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/serializer/InstagramUserInfoSerializer.java
@@ -0,0 +1,81 @@
+package org.apache.streams.instagram.serializer;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.instagram.UsersInfo;
+import org.apache.streams.instagram.provider.userinfo.InstagramUserInfoProvider;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Image;
+import org.apache.streams.pojo.json.Provider;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.reflect.generics.reflectiveObjects.NotImplementedException;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public class InstagramUserInfoSerializer implements ActivitySerializer<UserInfoData> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InstagramUserInfoSerializer.class);
+
+    private static final String STREAMS_ID_PREFIX = "id:instagram:";
+    private static final String PROVIDER_ID = "id:provider:instagram";
+    private static final String DISPLAY_NAME = "Instagram";
+
+    @Override
+    public String serializationFormat() {
+        return null;
+    }
+
+    @Override
+    public UserInfoData serialize(Activity deserialized) throws ActivitySerializerException {
+        throw new NotImplementedException();
+    }
+
+    @Override
+    public Activity deserialize(UserInfoData serialized) throws ActivitySerializerException {
+        Activity activity = new Activity();
+        Provider provider = new Provider();
+        provider.setId(PROVIDER_ID);
+        provider.setDisplayName(DISPLAY_NAME);
+        activity.setProvider(provider);
+
+        Actor actor = new Actor();
+        Image image = new Image();
+        image.setUrl(serialized.getProfile_picture());
+        actor.setImage(image);
+        actor.setId(STREAMS_ID_PREFIX+serialized.getId());
+        actor.setSummary(serialized.getBio());
+        actor.setAdditionalProperty("handle", serialized.getUsername());
+        actor.setDisplayName(serialized.getFullName());
+        Map<String, Object> extensions = Maps.newHashMap();
+        actor.setAdditionalProperty("extensions", extensions);
+        extensions.put("screenName", serialized.getUsername());
+        extensions.put("posts", serialized.getCounts().getMedia());
+        extensions.put("followers", serialized.getCounts().getFollwed_by());
+        extensions.put("website", serialized.getWebsite());
+        extensions.put("following", serialized.getCounts().getFollows());
+        return activity;
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<UserInfoData> serializedList) {
+        List<Activity> result = Lists.newLinkedList();
+        for(UserInfoData data : serializedList) {
+            try {
+                result.add(deserialize(data));
+            } catch (ActivitySerializerException ase) {
+                LOGGER.error("Caught ActivitySerializerException, dropping user info data : {}", data.getId());
+                LOGGER.error("Exception : {}", ase);
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b85dfa71/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
new file mode 100644
index 0000000..7399aff
--- /dev/null
+++ b/streams-contrib/streams-provider-instagram/src/test/java/org/apache/streams/instagram/provider/userinfo/InstagramUserInfoCollectorTest.java
@@ -0,0 +1,120 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance *
+http://www.apache.org/licenses/LICENSE-2.0 *
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License. */
+package org.apache.streams.instagram.provider.userinfo;
+
+import com.carrotsearch.randomizedtesting.RandomizedTest;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+import com.google.common.collect.Queues;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
+import org.apache.streams.instagram.UsersInfo;
+import org.jinstagram.Instagram;
+import org.jinstagram.entity.users.basicinfo.UserInfo;
+import org.jinstagram.entity.users.basicinfo.UserInfoData;
+import org.jinstagram.exceptions.InstagramBadRequestException;
+import org.jinstagram.exceptions.InstagramException;
+import org.jinstagram.exceptions.InstagramRateLimitException;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for {@link org.apache.streams.instagram.provider.userinfo.InstagramUserInfoCollector}
+ */
+public class InstagramUserInfoCollectorTest extends RandomizedTest {
+
+    private boolean succesful;
+
+    @Test
+    public void testConvertToStreamsDatum() {
+        InstagramUserInfoCollector collector = new InstagramUserInfoCollector(new ConcurrentLinkedQueue<StreamsDatum>(), createNonNullConfiguration());
+        UserInfoData userInfoData = mock(UserInfoData.class);
+        long id = 1;
+        when(userInfoData.getId()).thenReturn(id);
+        StreamsDatum datum = collector.convertToStreamsDatum(userInfoData);
+        assertNotNull(datum);
+        assertEquals(userInfoData, datum.document);
+        assertEquals(Long.toString(userInfoData.getId()), datum.getId());
+    }
+
+    @Test
+    @Repeat(iterations = 3)
+    public void testcollectInstagramDataForUser() {
+        succesful = false;
+        Queue<StreamsDatum> datums = Queues.newConcurrentLinkedQueue();
+        final Instagram client = createMockClient();
+        InstagramUserInfoCollector collector = new InstagramUserInfoCollector(datums, createNonNullConfiguration()) {
+            @Override
+            protected Instagram getNextInstagramClient() {
+                return client;
+            }
+        };
+        if(succesful) {
+            assertEquals(1, datums.size());
+        } else {
+            assertEquals(0, datums.size());
+        }
+    }
+
+    private Instagram createMockClient() {
+        Instagram client = mock(Instagram.class);
+        try {
+            when(client.getUserInfo(anyLong())).thenAnswer(new Answer<UserInfo>() {
+                @Override
+                public UserInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
+                    int exception = randomInt(10);
+                    if (exception == 0) {
+                        throw mock(InstagramRateLimitException.class);
+                    } else if (exception == 1) {
+                        throw mock(InstagramBadRequestException.class);
+                    } else if (exception == 2) {
+                        throw mock(InstagramException.class);
+                    } else {
+                        UserInfo info = mock(UserInfo.class);
+                        UserInfoData data = mock(UserInfoData.class);
+                        when(data.getId()).thenReturn(randomLong());
+                        when(info.getData()).thenReturn(data);
+                        succesful = true;
+                        return info;
+                    }
+                }
+            });
+        } catch (InstagramException e) {
+            fail("Never should throw exception creating mock instagram object");
+        }
+        return client;
+    }
+
+    private InstagramConfiguration createNonNullConfiguration() {
+        InstagramConfiguration configuration = new InstagramConfiguration();
+        UsersInfo info = new UsersInfo();
+        configuration.setUsersInfo(info);
+        info.setUsers(new HashSet<User>());
+        info.setAuthorizedTokens(new HashSet<String>());
+        return configuration;
+    }
+
+}