You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/18 18:10:22 UTC

[4/8] git commit: STREAMS-143 | Methods to set configuration in code and code review feedback

STREAMS-143 | Methods to set configuration in code and code review feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ecd80062
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ecd80062
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ecd80062

Branch: refs/heads/master
Commit: ecd80062969a611156bffb09defe4f0957b14fc4
Parents: 9f296be
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 13 14:50:28 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 13 14:50:28 2014 -0500

----------------------------------------------------------------------
 .../instagram/provider/InstagramOauthToken.java | 21 +++---
 .../provider/InstagramRecentMediaCollector.java | 44 +++++++-----
 .../provider/InstagramRecentMediaProvider.java  | 70 ++++++++++++++++++--
 .../com/instagram/InstagramConfiguration.json   | 24 ++++---
 .../util/oauth/tokens/AbstractOauthToken.java   | 19 ++++++
 .../streams/util/oauth/tokens/OauthToken.java   | 19 ------
 .../tokens/tokenmanager/SimpleTokenManager.java |  4 +-
 .../tokenmanager/impl/BasicTokenManger.java     |  4 +-
 .../tokenmanager/TestBasicTokenManager.java     |  4 +-
 9 files changed, 143 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index 9773f92..d41fc64 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -1,28 +1,29 @@
 package org.apache.streams.instagram.provider;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+
+import org.jinstagram.auth.model.Token;
 
 /**
- *
+ * Extends JInstagram Token. Only difference is it overrides the equal method and determines equality based on the
+ * token string.
  */
-public class InstagramOauthToken extends OauthToken{
+public class InstagramOauthToken extends Token {
 
-    private String clientId;
 
-    public InstagramOauthToken(String clientId) {
-        this.clientId = clientId;
+    public InstagramOauthToken(String token) {
+        this(token, null);
     }
 
-    public String getClientId() {
-        return clientId;
+    public InstagramOauthToken(String token, String secret) {
+        super(token, secret);
     }
 
     @Override
-    protected boolean internalEquals(Object o) {
+    public boolean equals(Object o) {
         if(!(o instanceof InstagramOauthToken)) {
             return false;
         }
         InstagramOauthToken that = (InstagramOauthToken) o;
-        return this.clientId.equals(that.clientId);
+        return this.getToken().equals(that.getToken());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 932828d..08b1696 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -16,6 +16,7 @@ package org.apache.streams.instagram.provider;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
 import org.apache.streams.instagram.UserId;
 import org.apache.streams.util.api.requests.backoff.BackOffException;
 import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
@@ -52,6 +53,7 @@ public class InstagramRecentMediaCollector implements Runnable {
     private SimpleTokenManager<InstagramOauthToken> tokenManger;
     private int consecutiveErrorCount;
     private BackOffStrategy backOffStrategy;
+    private Instagram instagram;
 
 
     public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
@@ -59,17 +61,27 @@ public class InstagramRecentMediaCollector implements Runnable {
         this.config = config;
         this.isCompleted = new AtomicBoolean(false);
         this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
-        for (String clientId : this.config.getClientIds()) {
-            this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
+        for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+            this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
         }
         this.consecutiveErrorCount = 0;
         this.backOffStrategy = new ExponentialBackOffStrategy(2);
+        this.instagram = new Instagram(this.config.getClientId());
     }
 
 
+    /**
+     * If there are authorized tokens available, it sets a new token for the client and returns
+     * the client.  If there are no available tokens, it simply returns the client that was
+     * initialized in the constructor with client id.
+     * @return
+     */
     @VisibleForTesting
     protected Instagram getNextInstagramClient() {
-        return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
+        if(this.tokenManger.numAvailableTokens() > 0) {
+            this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+        }
+        return this.instagram;
     }
 
     private void queueData(MediaFeed userFeed, String userId) {
@@ -92,7 +104,7 @@ public class InstagramRecentMediaCollector implements Runnable {
     @Override
     public void run() {
         try {
-            for (UserId user : this.config.getUsersInfo().getUserIds()) {
+            for (User user : this.config.getUsersInfo().getUsers()) {
                 collectMediaFeed(user);
             }
         } catch (Exception e) {
@@ -108,7 +120,7 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @throws Exception
      */
     @VisibleForTesting
-    protected void collectMediaFeed(UserId user) throws Exception {
+    protected void collectMediaFeed(User user) throws Exception {
         Pagination pagination = null;
         do {
             int attempts = 0;
@@ -128,9 +140,16 @@ public class InstagramRecentMediaCollector implements Runnable {
                         feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
                     }
                 } catch (Exception e) {
-                    handleException(e);
-                    if(e instanceof InstagramBadRequestException) {
+                    if(e instanceof InstagramRateLimitException) {
+                        LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+                        this.backOffStrategy.backOff();
+                    } else if(e instanceof InstagramBadRequestException) {
+                        LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
                         attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+                        ++this.consecutiveErrorCount;
+                    } else {
+                        LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+                        ++this.consecutiveErrorCount;
                     }
                     if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
                         throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
@@ -156,16 +175,7 @@ public class InstagramRecentMediaCollector implements Runnable {
      * @throws BackOffException
      */
     protected void handleException(Exception e) throws BackOffException {
-        if(e instanceof InstagramRateLimitException) {
-            LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
-            this.backOffStrategy.backOff();
-        } else if(e instanceof InstagramBadRequestException) {
-            LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
-            ++this.consecutiveErrorCount;
-        } else {
-            LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
-            ++this.consecutiveErrorCount;
-        }
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index e3e9399..6f975d0 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -16,18 +16,22 @@ package org.apache.streams.instagram.provider;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.instagram.*;
 import org.apache.streams.util.SerializationUtil;
+import org.jinstagram.auth.model.Token;
 import org.jinstagram.entity.users.feed.MediaFeedData;
 import org.joda.time.DateTime;
 
 import java.math.BigInteger;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -45,12 +49,11 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
     private AtomicBoolean isCompleted;
 
     public InstagramRecentMediaProvider() {
-        this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+        this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
     }
 
     public InstagramRecentMediaProvider(InstagramConfiguration config) {
-        this.config = config;
-        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+        this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
     }
 
     @Override
@@ -66,6 +69,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
      */
     @VisibleForTesting
     protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+        this.updateUserInfoList();
         return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
     }
 
@@ -101,6 +105,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
 
     @Override
     public void prepare(Object configurationObject) {
+        this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
         this.isCompleted = new AtomicBoolean(false);
     }
 
@@ -126,7 +131,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
         }
         DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
         DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
-        for(UserId user : usersInfo.getUserIds()) {
+        for(User user : usersInfo.getUsers()) {
             if(defaultAfterDate != null && user.getAfterDate() == null) {
                 user.setAfterDate(defaultAfterDate);
             }
@@ -136,5 +141,62 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
         }
     }
 
+    /**
+     * Overrides the client id in the configuration.
+     * @param clientId client id to use
+     */
+    public void setInstagramClientId(String clientId) {
+        this.config.setClientId(clientId);
+    }
+
+    /**
+     * Overrides authroized user tokens in the configuration.
+     * @param tokenStrings
+     */
+    public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+        ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+    }
+
+    /**
+     * Overrides the default before date in the configuration
+     * @param beforeDate
+     */
+    public void setDefaultBeforeDate(DateTime beforeDate) {
+        ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+    }
+
+    /**
+     * Overrides the default after date in the configuration
+     * @param afterDate
+     */
+    public void setDefaultAfterDate(DateTime afterDate) {
+        ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+    }
+
+    /**
+     * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+     * pull data from as early as possible.  If default before or after DateTimes are set, they will applied to all
+     * NULL DateTimes.
+     * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+     */
+    public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+        Set<User> users = Sets.newHashSet();
+        for(String userId : usersWithAfterDate.keySet()) {
+            User user = new User();
+            user.setUserId(userId);
+            user.setAfterDate(usersWithAfterDate.get(userId));
+            users.add(user);
+        }
+        ensureUsersInfo(this.config).setUsers(users);
+    }
+
+    private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+        UsersInfo usersInfo = config.getUsersInfo();
+        if(usersInfo == null) {
+            usersInfo = new UsersInfo();
+            config.setUsersInfo(usersInfo);
+        }
+        return usersInfo;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index c662660..431efbc 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,23 +5,27 @@
     "javaType" : "org.apache.streams.instagram.InstagramConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
-        "clientIds": {
-            "type": "array",
-            "uniqueItems": true,
-            "items": {
-                "type": "string"
-            },
-            "description": "Your Instagram Client Ids"
+        "clientId": {
+            "type": "string",
+            "description": "Your Instagram Client Id"
         },
         "usersInfo": {
             "type": "object",
             "properties": {
-                "userIds": {
+                "authorizedTokens": {
+                    "type": "array",
+                    "uniqueItems": true,
+                    "items": {
+                        "type": "string"
+                    },
+                    "description": "Instagram tokens for authorized users of your client/app"
+                },
+                "users": {
                     "type": "array",
                     "uniqueItems": true,
                     "items": {
                         "type": "object",
-                        "$ref": "#/definitions/userInfo"
+                        "$ref": "#/definitions/user"
                     },
                     "description": "List of user ids to gather data for. Type of data gathered depends on provider"
                 },
@@ -39,7 +43,7 @@
         }
    },
     "definitions": {
-        "userInfo": {
+        "user": {
             "type": "object",
             "properties": {
                 "userId": {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
new file mode 100644
index 0000000..fe48600
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -0,0 +1,19 @@
+package org.apache.streams.util.oauth.tokens;
+
+/**
+ *
+ */
+public abstract class AbstractOauthToken {
+
+    /**
+     * Must create equals method for all OauthTokens.
+     * @param o
+     * @return true if equal, and false otherwise
+     */
+    protected abstract boolean internalEquals(Object o);
+
+    @Override
+    public boolean equals(Object o) {
+        return this.internalEquals(o);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
deleted file mode 100644
index df264c5..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.streams.util.oauth.tokens;
-
-/**
- *
- */
-public abstract class OauthToken {
-
-    /**
-     * Must create equals method for all OauthTokens.
-     * @param o
-     * @return true if equal, and false otherwise
-     */
-    protected abstract boolean internalEquals(Object o);
-
-    @Override
-    public boolean equals(Object o) {
-        return this.internalEquals(o);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index d052da1..903e48d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -1,13 +1,13 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 
 import java.util.Collection;
 
 /**
  * Manges access to oauth tokens.  Allows a caller to add tokens to the token pool and receive an available token.
  */
-public interface SimpleTokenManager<T extends OauthToken> {
+public interface SimpleTokenManager<T> {
 
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
index 20c8d20..34238b3 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -1,6 +1,6 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
 
 import java.util.ArrayList;
@@ -15,7 +15,7 @@ import java.util.Collection;
  *
  * The manager class is thread safe.
  */
-public class BasicTokenManger<T extends OauthToken> implements SimpleTokenManager<T>{
+public class BasicTokenManger<T> implements SimpleTokenManager<T>{
 
     private ArrayList<T> availableTokens;
     private int nextToken;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
index 903cc69..cd9ed18 100644
--- a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
+++ b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -1,7 +1,7 @@
 package org.apache.streams.util.oauth.tokens.tokenmanager;
 
 
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
 import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
 import org.junit.Test;
 
@@ -22,7 +22,7 @@ public class TestBasicTokenManager {
     /**
      * Simple token for testing purposes
      */
-    private class TestToken extends OauthToken {
+    private class TestToken extends AbstractOauthToken {
 
         private String s;