You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/08/18 18:10:22 UTC
[4/8] git commit: STREAMS-143 | Methods to set configuration in code
and code review feedback
STREAMS-143 | Methods to set configuration in code and code review feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/ecd80062
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/ecd80062
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/ecd80062
Branch: refs/heads/master
Commit: ecd80062969a611156bffb09defe4f0957b14fc4
Parents: 9f296be
Author: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Authored: Wed Aug 13 14:50:28 2014 -0500
Committer: Ryan Ebanks <re...@Informations-MacBook-Pro-3.local>
Committed: Wed Aug 13 14:50:28 2014 -0500
----------------------------------------------------------------------
.../instagram/provider/InstagramOauthToken.java | 21 +++---
.../provider/InstagramRecentMediaCollector.java | 44 +++++++-----
.../provider/InstagramRecentMediaProvider.java | 70 ++++++++++++++++++--
.../com/instagram/InstagramConfiguration.json | 24 ++++---
.../util/oauth/tokens/AbstractOauthToken.java | 19 ++++++
.../streams/util/oauth/tokens/OauthToken.java | 19 ------
.../tokens/tokenmanager/SimpleTokenManager.java | 4 +-
.../tokenmanager/impl/BasicTokenManger.java | 4 +-
.../tokenmanager/TestBasicTokenManager.java | 4 +-
9 files changed, 143 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
index 9773f92..d41fc64 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramOauthToken.java
@@ -1,28 +1,29 @@
package org.apache.streams.instagram.provider;
-import org.apache.streams.util.oauth.tokens.OauthToken;
+
+import org.jinstagram.auth.model.Token;
/**
- *
+ * Extends JInstagram Token. Only difference is it overrides the equal method and determines equality based on the
+ * token string.
*/
-public class InstagramOauthToken extends OauthToken{
+public class InstagramOauthToken extends Token {
- private String clientId;
- public InstagramOauthToken(String clientId) {
- this.clientId = clientId;
+ public InstagramOauthToken(String token) {
+ this(token, null);
}
- public String getClientId() {
- return clientId;
+ public InstagramOauthToken(String token, String secret) {
+ super(token, secret);
}
@Override
- protected boolean internalEquals(Object o) {
+ public boolean equals(Object o) {
if(!(o instanceof InstagramOauthToken)) {
return false;
}
InstagramOauthToken that = (InstagramOauthToken) o;
- return this.clientId.equals(that.clientId);
+ return this.getToken().equals(that.getToken());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
index 932828d..08b1696 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaCollector.java
@@ -16,6 +16,7 @@ package org.apache.streams.instagram.provider;
import com.google.common.annotations.VisibleForTesting;
import org.apache.streams.instagram.InstagramConfiguration;
+import org.apache.streams.instagram.User;
import org.apache.streams.instagram.UserId;
import org.apache.streams.util.api.requests.backoff.BackOffException;
import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
@@ -52,6 +53,7 @@ public class InstagramRecentMediaCollector implements Runnable {
private SimpleTokenManager<InstagramOauthToken> tokenManger;
private int consecutiveErrorCount;
private BackOffStrategy backOffStrategy;
+ private Instagram instagram;
public InstagramRecentMediaCollector(Queue<MediaFeedData> queue, InstagramConfiguration config) {
@@ -59,17 +61,27 @@ public class InstagramRecentMediaCollector implements Runnable {
this.config = config;
this.isCompleted = new AtomicBoolean(false);
this.tokenManger = new BasicTokenManger<InstagramOauthToken>();
- for (String clientId : this.config.getClientIds()) {
- this.tokenManger.addTokenToPool(new InstagramOauthToken(clientId));
+ for (String tokens : this.config.getUsersInfo().getAuthorizedTokens()) {
+ this.tokenManger.addTokenToPool(new InstagramOauthToken(tokens));
}
this.consecutiveErrorCount = 0;
this.backOffStrategy = new ExponentialBackOffStrategy(2);
+ this.instagram = new Instagram(this.config.getClientId());
}
+ /**
+ * If there are authorized tokens available, it sets a new token for the client and returns
+ * the client. If there are no available tokens, it simply returns the client that was
+ * initialized in the constructor with client id.
+ * @return
+ */
@VisibleForTesting
protected Instagram getNextInstagramClient() {
- return new Instagram(this.tokenManger.getNextAvailableToken().getClientId());
+ if(this.tokenManger.numAvailableTokens() > 0) {
+ this.instagram.setAccessToken(this.tokenManger.getNextAvailableToken());
+ }
+ return this.instagram;
}
private void queueData(MediaFeed userFeed, String userId) {
@@ -92,7 +104,7 @@ public class InstagramRecentMediaCollector implements Runnable {
@Override
public void run() {
try {
- for (UserId user : this.config.getUsersInfo().getUserIds()) {
+ for (User user : this.config.getUsersInfo().getUsers()) {
collectMediaFeed(user);
}
} catch (Exception e) {
@@ -108,7 +120,7 @@ public class InstagramRecentMediaCollector implements Runnable {
* @throws Exception
*/
@VisibleForTesting
- protected void collectMediaFeed(UserId user) throws Exception {
+ protected void collectMediaFeed(User user) throws Exception {
Pagination pagination = null;
do {
int attempts = 0;
@@ -128,9 +140,16 @@ public class InstagramRecentMediaCollector implements Runnable {
feed = getNextInstagramClient().getRecentMediaNextPage(pagination);
}
} catch (Exception e) {
- handleException(e);
- if(e instanceof InstagramBadRequestException) {
+ if(e instanceof InstagramRateLimitException) {
+ LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
+ this.backOffStrategy.backOff();
+ } else if(e instanceof InstagramBadRequestException) {
+ LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
attempts = MAX_ATTEMPTS; //don't repeat bad requests.
+ ++this.consecutiveErrorCount;
+ } else {
+ LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
+ ++this.consecutiveErrorCount;
}
if(this.consecutiveErrorCount > Math.max(this.tokenManger.numAvailableTokens(), MAX_ATTEMPTS*2)) {
throw new Exception("InstagramCollector failed to successfully connect to instagram on "+this.consecutiveErrorCount+" attempts.");
@@ -156,16 +175,7 @@ public class InstagramRecentMediaCollector implements Runnable {
* @throws BackOffException
*/
protected void handleException(Exception e) throws BackOffException {
- if(e instanceof InstagramRateLimitException) {
- LOGGER.warn("Received rate limit exception from Instagram, backing off. : {}", e);
- this.backOffStrategy.backOff();
- } else if(e instanceof InstagramBadRequestException) {
- LOGGER.error("Received Bad Requests exception form Instagram: {}", e);
- ++this.consecutiveErrorCount;
- } else {
- LOGGER.error("Received Expection while attempting to poll Instagram: {}", e);
- ++this.consecutiveErrorCount;
- }
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
index e3e9399..6f975d0 100644
--- a/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
+++ b/streams-contrib/streams-provider-instagram/src/main/java/org/apache/streams/instagram/provider/InstagramRecentMediaProvider.java
@@ -16,18 +16,22 @@ package org.apache.streams.instagram.provider;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.instagram.*;
import org.apache.streams.util.SerializationUtil;
+import org.jinstagram.auth.model.Token;
import org.jinstagram.entity.users.feed.MediaFeedData;
import org.joda.time.DateTime;
import java.math.BigInteger;
+import java.util.Collection;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -45,12 +49,11 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
private AtomicBoolean isCompleted;
public InstagramRecentMediaProvider() {
- this(InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram")));
+ this.config = InstagramConfigurator.detectInstagramConfiguration(StreamsConfigurator.config.getConfig("instagram"));
}
public InstagramRecentMediaProvider(InstagramConfiguration config) {
- this.config = config;
- this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
+ this.config = (InstagramConfiguration) SerializationUtil.cloneBySerialization(config);
}
@Override
@@ -66,6 +69,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
*/
@VisibleForTesting
protected InstagramRecentMediaCollector getInstagramRecentMediaCollector() {
+ this.updateUserInfoList();
return new InstagramRecentMediaCollector(this.mediaFeedQueue, this.config);
}
@@ -101,6 +105,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
@Override
public void prepare(Object configurationObject) {
+ this.mediaFeedQueue = Queues.newConcurrentLinkedQueue();
this.isCompleted = new AtomicBoolean(false);
}
@@ -126,7 +131,7 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
}
DateTime defaultAfterDate = usersInfo.getDefaultAfterDate();
DateTime defaultBeforeDate = usersInfo.getDefaultBeforeDate();
- for(UserId user : usersInfo.getUserIds()) {
+ for(User user : usersInfo.getUsers()) {
if(defaultAfterDate != null && user.getAfterDate() == null) {
user.setAfterDate(defaultAfterDate);
}
@@ -136,5 +141,62 @@ public class InstagramRecentMediaProvider implements StreamsProvider {
}
}
+ /**
+ * Overrides the client id in the configuration.
+ * @param clientId client id to use
+ */
+ public void setInstagramClientId(String clientId) {
+ this.config.setClientId(clientId);
+ }
+
+ /**
+ * Overrides authroized user tokens in the configuration.
+ * @param tokenStrings
+ */
+ public void setAuthorizedUserTokens(Collection<String> tokenStrings) {
+ ensureUsersInfo(this.config).setAuthorizedTokens(Sets.newHashSet(tokenStrings));
+ }
+
+ /**
+ * Overrides the default before date in the configuration
+ * @param beforeDate
+ */
+ public void setDefaultBeforeDate(DateTime beforeDate) {
+ ensureUsersInfo(this.config).setDefaultBeforeDate(beforeDate);
+ }
+
+ /**
+ * Overrides the default after date in the configuration
+ * @param afterDate
+ */
+ public void setDefaultAfterDate(DateTime afterDate) {
+ ensureUsersInfo(this.config).setDefaultAfterDate(afterDate);
+ }
+
+ /**
+ * Overrides the users in the configuration and sets the after date for each user. A NULL DateTime implies
+ * pull data from as early as possible. If default before or after DateTimes are set, they will applied to all
+ * NULL DateTimes.
+ * @param usersWithAfterDate instagram user id mapped to BeforeDate time
+ */
+ public void setUsersWithAfterDate(Map<String, DateTime> usersWithAfterDate) {
+ Set<User> users = Sets.newHashSet();
+ for(String userId : usersWithAfterDate.keySet()) {
+ User user = new User();
+ user.setUserId(userId);
+ user.setAfterDate(usersWithAfterDate.get(userId));
+ users.add(user);
+ }
+ ensureUsersInfo(this.config).setUsers(users);
+ }
+
+ private UsersInfo ensureUsersInfo(InstagramConfiguration config) {
+ UsersInfo usersInfo = config.getUsersInfo();
+ if(usersInfo == null) {
+ usersInfo = new UsersInfo();
+ config.setUsersInfo(usersInfo);
+ }
+ return usersInfo;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
index c662660..431efbc 100644
--- a/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
+++ b/streams-contrib/streams-provider-instagram/src/main/jsonschema/com/instagram/InstagramConfiguration.json
@@ -5,23 +5,27 @@
"javaType" : "org.apache.streams.instagram.InstagramConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
- "clientIds": {
- "type": "array",
- "uniqueItems": true,
- "items": {
- "type": "string"
- },
- "description": "Your Instagram Client Ids"
+ "clientId": {
+ "type": "string",
+ "description": "Your Instagram Client Id"
},
"usersInfo": {
"type": "object",
"properties": {
- "userIds": {
+ "authorizedTokens": {
+ "type": "array",
+ "uniqueItems": true,
+ "items": {
+ "type": "string"
+ },
+ "description": "Instagram tokens for authorized users of your client/app"
+ },
+ "users": {
"type": "array",
"uniqueItems": true,
"items": {
"type": "object",
- "$ref": "#/definitions/userInfo"
+ "$ref": "#/definitions/user"
},
"description": "List of user ids to gather data for. Type of data gathered depends on provider"
},
@@ -39,7 +43,7 @@
}
},
"definitions": {
- "userInfo": {
+ "user": {
"type": "object",
"properties": {
"userId": {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
new file mode 100644
index 0000000..fe48600
--- /dev/null
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/AbstractOauthToken.java
@@ -0,0 +1,19 @@
+package org.apache.streams.util.oauth.tokens;
+
+/**
+ *
+ */
+public abstract class AbstractOauthToken {
+
+ /**
+ * Must create equals method for all OauthTokens.
+ * @param o
+ * @return true if equal, and false otherwise
+ */
+ protected abstract boolean internalEquals(Object o);
+
+ @Override
+ public boolean equals(Object o) {
+ return this.internalEquals(o);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
deleted file mode 100644
index df264c5..0000000
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/OauthToken.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.streams.util.oauth.tokens;
-
-/**
- *
- */
-public abstract class OauthToken {
-
- /**
- * Must create equals method for all OauthTokens.
- * @param o
- * @return true if equal, and false otherwise
- */
- protected abstract boolean internalEquals(Object o);
-
- @Override
- public boolean equals(Object o) {
- return this.internalEquals(o);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
index d052da1..903e48d 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/SimpleTokenManager.java
@@ -1,13 +1,13 @@
package org.apache.streams.util.oauth.tokens.tokenmanager;
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
import java.util.Collection;
/**
* Manges access to oauth tokens. Allows a caller to add tokens to the token pool and receive an available token.
*/
-public interface SimpleTokenManager<T extends OauthToken> {
+public interface SimpleTokenManager<T> {
/**
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
----------------------------------------------------------------------
diff --git a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
index 20c8d20..34238b3 100644
--- a/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
+++ b/streams-util/src/main/java/org/apache/streams/util/oauth/tokens/tokenmanager/impl/BasicTokenManger.java
@@ -1,6 +1,6 @@
package org.apache.streams.util.oauth.tokens.tokenmanager.impl;
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
import java.util.ArrayList;
@@ -15,7 +15,7 @@ import java.util.Collection;
*
* The manager class is thread safe.
*/
-public class BasicTokenManger<T extends OauthToken> implements SimpleTokenManager<T>{
+public class BasicTokenManger<T> implements SimpleTokenManager<T>{
private ArrayList<T> availableTokens;
private int nextToken;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/ecd80062/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
----------------------------------------------------------------------
diff --git a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
index 903cc69..cd9ed18 100644
--- a/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
+++ b/streams-util/src/test/java/org/apache/streams/util/oauth/tokens/tokenmanager/TestBasicTokenManager.java
@@ -1,7 +1,7 @@
package org.apache.streams.util.oauth.tokens.tokenmanager;
-import org.apache.streams.util.oauth.tokens.OauthToken;
+import org.apache.streams.util.oauth.tokens.AbstractOauthToken;
import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
import org.junit.Test;
@@ -22,7 +22,7 @@ public class TestBasicTokenManager {
/**
* Simple token for testing purposes
*/
- private class TestToken extends OauthToken {
+ private class TestToken extends AbstractOauthToken {
private String s;