You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/25 20:25:13 UTC
[32/42] incubator-streams git commit: STREAMS-440: custom
checkstyle.xml, address compliance
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
index 6b4f28a..d1936d1 100644
--- a/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
+++ b/streams-contrib/streams-processor-regex/src/main/java/org/apache/streams/regex/RegexUtils.java
@@ -21,6 +21,7 @@ package org.apache.streams.regex;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,73 +31,73 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
- * Provides utilities for extracting matches from content
+ * Provides utilities for extracting matches from content.
*/
public class RegexUtils {
- private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap();
- private final static Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class);
+ private static final Map<String, Pattern> patternCache = Maps.newConcurrentMap();
+ private static final Logger LOGGER = LoggerFactory.getLogger(RegexUtils.class);
- private RegexUtils() {}
+ private RegexUtils() {}
- /**
- * Extracts matches of the given pattern in the content and returns them as a list.
- * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911.
- * @param content the complete content to find matches in.
- * @return a non-null list of matches.
- */
- public static Map<String, List<Integer>> extractMatches(String pattern, String content) {
- return getMatches(pattern, content, -1);
- }
+ /**
+ * Extracts matches of the given pattern in the content and returns them as a list.
+ * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911.
+ * @param content the complete content to find matches in.
+ * @return a non-null list of matches.
+ */
+ public static Map<String, List<Integer>> extractMatches(String pattern, String content) {
+ return getMatches(pattern, content, -1);
+ }
- /**
- * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list.
- * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911.
- * @param content the complete content to find matches in.
- * @return a non-null list of matches.
- */
- public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) {
- pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)";
- return getMatches(pattern, content, 2);
- }
+ /**
+ * Extracts matches of the given pattern that are bounded by separation characters and returns them as a list.
+ * @param pattern the pattern for the substring to match. For example, [0-9]* matches 911 in Emergency number is 911.
+ * @param content the complete content to find matches in.
+ * @return a non-null list of matches.
+ */
+ public static Map<String, List<Integer>> extractWordMatches(String pattern, String content) {
+ pattern = "(^|\\s)(" + pattern + ")([\\s!\\.;,?]|$)";
+ return getMatches(pattern, content, 2);
+ }
- protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) {
- try {
- Map<String, List<Integer>> matches = Maps.newHashMap();
- if(content == null) {
- return matches;
- }
+ protected static Map<String, List<Integer>> getMatches(String pattern, String content, int capture) {
+ try {
+ Map<String, List<Integer>> matches = Maps.newHashMap();
+ if (content == null) {
+ return matches;
+ }
- Matcher m = getPattern(pattern).matcher(content);
- while (m.find()) {
- String group = capture > 0 ? m.group(capture) : m.group();
- if (group != null && !group.equals("")) {
- List<Integer> indices;
- if (matches.containsKey(group)) {
- indices = matches.get(group);
- } else {
- indices = Lists.newArrayList();
- matches.put(group, indices);
- }
- indices.add(m.start());
- }
- }
- return matches;
- } catch (Throwable e) {
- LOGGER.error("Throwable process {}", e);
- e.printStackTrace();
- throw new RuntimeException(e);
+ Matcher matcher = getPattern(pattern).matcher(content);
+ while (matcher.find()) {
+ String group = capture > 0 ? matcher.group(capture) : matcher.group();
+ if (group != null && !group.equals("")) {
+ List<Integer> indices;
+ if (matches.containsKey(group)) {
+ indices = matches.get(group);
+ } else {
+ indices = Lists.newArrayList();
+ matches.put(group, indices);
+ }
+ indices.add(matcher.start());
}
+ }
+ return matches;
+ } catch (Throwable ex) {
+ LOGGER.error("Throwable process {}", ex);
+ ex.printStackTrace();
+ throw new RuntimeException(ex);
}
+ }
- private static Pattern getPattern(String pattern) {
- Pattern p;
- if (patternCache.containsKey(pattern)) {
- p = patternCache.get(pattern);
- } else {
- p = Pattern.compile(pattern);
- patternCache.put(pattern, p);
- }
- return p;
+ private static Pattern getPattern(String patternString) {
+ Pattern pattern;
+ if (patternCache.containsKey(patternString)) {
+ pattern = patternCache.get(patternString);
+ } else {
+ pattern = Pattern.compile(patternString);
+ patternCache.put(patternString, pattern);
}
+ return pattern;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
index 2de4aa8..6e17de8 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexHashtagExtractorTest.java
@@ -19,7 +19,6 @@
package org.apache.streams.regex;
-
import com.google.common.collect.Sets;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.extensions.ExtensionUtil;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
index c7778a8..66f7aa5 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexMentionExtractorTest.java
@@ -19,7 +19,6 @@
package org.apache.streams.regex;
-
import com.google.common.collect.Sets;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.extensions.ExtensionUtil;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
index 344bf98..d5d8d9b 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUrlExtractorTest.java
@@ -19,7 +19,6 @@
package org.apache.streams.regex;
-
import com.google.common.collect.Sets;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.pojo.json.Activity;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
index fc2b9f6..a156f3a 100644
--- a/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
+++ b/streams-contrib/streams-processor-regex/src/test/java/org/apache/streams/regex/RegexUtilsTest.java
@@ -19,7 +19,6 @@
package org.apache.streams.regex;
-
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -33,7 +32,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-
@RunWith(Parameterized.class)
public class RegexUtilsTest {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
index 7a6648a..1216c38 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPageActivitySerializer.java
@@ -18,15 +18,17 @@
package org.apache.streams.facebook.api;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.exceptions.ActivitySerializerException;
+import org.apache.streams.facebook.Page;
import org.apache.streams.facebook.serializer.FacebookActivityUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.facebook.Page;
import org.apache.streams.pojo.json.Activity;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.NotImplementedException;
+
import java.util.List;
/**
@@ -35,32 +37,32 @@ import java.util.List;
*/
public class FacebookPageActivitySerializer implements ActivitySerializer<Page> {
- public static ObjectMapper mapper;
- static {
- mapper = StreamsJacksonMapper.getInstance();
- }
+ public static ObjectMapper mapper;
+ static {
+ mapper = StreamsJacksonMapper.getInstance();
+ }
- @Override
- public String serializationFormat() {
- return "facebook_post_json_v1";
- }
+ @Override
+ public String serializationFormat() {
+ return "facebook_post_json_v1";
+ }
- @Override
- public Page serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException("Not currently supported by this deserializer");
- }
+ @Override
+ public Page serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException("Not currently supported by this deserializer");
+ }
- @Override
- public Activity deserialize(Page page) throws ActivitySerializerException {
- Activity activity = new Activity();
+ @Override
+ public Activity deserialize(Page page) throws ActivitySerializerException {
+ Activity activity = new Activity();
- FacebookActivityUtil.updateActivity(page, activity);
+ FacebookActivityUtil.updateActivity(page, activity);
- return activity;
- }
+ return activity;
+ }
- @Override
- public List<Activity> deserializeAll(List<Page> serializedList) {
- throw new NotImplementedException("Not currently supported by this deserializer");
- }
+ @Override
+ public List<Activity> deserializeAll(List<Page> serializedList) {
+ throw new NotImplementedException("Not currently supported by this deserializer");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
index 4326fb1..306fecc 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/api/FacebookPostActivitySerializer.java
@@ -15,16 +15,19 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.api;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.streams.data.ActivitySerializer;
import org.apache.streams.exceptions.ActivitySerializerException;
import org.apache.streams.facebook.Post;
import org.apache.streams.facebook.serializer.FacebookActivityUtil;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
@@ -33,37 +36,34 @@ import java.util.List;
public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> {
- public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
- public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
+ public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
+ public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
- public static final String PROVIDER_NAME = "Facebook";
+ public static final String PROVIDER_NAME = "Facebook";
- public static ObjectMapper mapper;
- static {
- mapper = StreamsJacksonMapper.getInstance();
- }
+ public static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Override
- public String serializationFormat() {
- return "facebook_post_json_v1";
- }
+ @Override
+ public String serializationFormat() {
+ return "facebook_post_json_v1";
+ }
- @Override
- public Post serialize(Activity deserialized) throws ActivitySerializerException {
- throw new NotImplementedException("Not currently supported by this deserializer");
- }
+ @Override
+ public Post serialize(Activity deserialized) throws ActivitySerializerException {
+ throw new NotImplementedException("Not currently supported by this deserializer");
+ }
- @Override
- public Activity deserialize(Post post) throws ActivitySerializerException {
- Activity activity = new Activity();
+ @Override
+ public Activity deserialize(Post post) throws ActivitySerializerException {
+ Activity activity = new Activity();
- FacebookActivityUtil.updateActivity(post, activity);
+ FacebookActivityUtil.updateActivity(post, activity);
- return activity;
- }
+ return activity;
+ }
- @Override
- public List<Activity> deserializeAll(List<Post> serializedList) {
- throw new NotImplementedException("Not currently supported by this deserializer");
- }
+ @Override
+ public List<Activity> deserializeAll(List<Post> serializedList) {
+ throw new NotImplementedException("Not currently supported by this deserializer");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
index 762b6c0..92cf333 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/processor/FacebookTypeConverter.java
@@ -18,12 +18,6 @@
package org.apache.streams.facebook.processor;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProcessor;
import org.apache.streams.exceptions.ActivitySerializerException;
@@ -34,6 +28,14 @@ import org.apache.streams.facebook.api.FacebookPostActivitySerializer;
import org.apache.streams.facebook.provider.FacebookEventClassifier;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,172 +43,187 @@ import java.io.IOException;
import java.util.List;
import java.util.Queue;
+/**
+ * FacebookTypeConverter converts facebook data to activity streams types.
+ */
public class FacebookTypeConverter implements StreamsProcessor {
- public final static String STREAMS_ID = "FacebookTypeConverter";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
-
- private ObjectMapper mapper;
-
- private Queue<StreamsDatum> inQueue;
- private Queue<StreamsDatum> outQueue;
-
- private Class inClass;
- private Class outClass;
-
- private FacebookPostActivitySerializer facebookPostActivitySerializer;
- private FacebookPageActivitySerializer facebookPageActivitySerializer;
-
- private int count = 0;
-
- public final static String TERMINATE = new String("TERMINATE");
-
- public FacebookTypeConverter(Class inClass, Class outClass) {
- this.inClass = inClass;
- this.outClass = outClass;
- }
-
- public Queue<StreamsDatum> getProcessorOutputQueue() {
- return outQueue;
- }
-
- public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
- inQueue = inputQueue;
+ public static final String STREAMS_ID = "FacebookTypeConverter";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
+
+ private ObjectMapper mapper;
+
+ private Queue<StreamsDatum> inQueue;
+ private Queue<StreamsDatum> outQueue;
+
+ private Class inClass;
+ private Class outClass;
+
+ private FacebookPostActivitySerializer facebookPostActivitySerializer;
+ private FacebookPageActivitySerializer facebookPageActivitySerializer;
+
+ private int count = 0;
+
+ public static final String TERMINATE = new String("TERMINATE");
+
+ public FacebookTypeConverter(Class inClass, Class outClass) {
+ this.inClass = inClass;
+ this.outClass = outClass;
+ }
+
+ public Queue<StreamsDatum> getProcessorOutputQueue() {
+ return outQueue;
+ }
+
+ public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
+ inQueue = inputQueue;
+ }
+
+ /**
+ * convert.
+ * @param event event
+ * @param inClass inClass
+ * @param outClass outClass
+ * @return Object
+ * @throws ActivitySerializerException ActivitySerializerException
+ * @throws JsonProcessingException JsonProcessingException
+ */
+ public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
+
+ Object result = null;
+
+ if ( outClass.equals( Activity.class )) {
+ LOGGER.debug("ACTIVITY");
+ if (inClass.equals(Post.class)) {
+ LOGGER.debug("POST");
+ result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
+ } else if (inClass.equals(Page.class)) {
+ LOGGER.debug("PAGE");
+ result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class));
+ }
+ } else if ( outClass.equals( Post.class )) {
+ LOGGER.debug("POST");
+ result = mapper.convertValue(event, Post.class);
+ } else if ( outClass.equals(Page.class)) {
+ LOGGER.debug("PAGE");
+ result = mapper.convertValue(event, Page.class);
+ } else if ( outClass.equals( ObjectNode.class )) {
+ LOGGER.debug("OBJECTNODE");
+ result = mapper.convertValue(event, ObjectNode.class);
}
- public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
- Object result = null;
-
- if( outClass.equals( Activity.class )) {
- LOGGER.debug("ACTIVITY");
- if(inClass.equals(Post.class)) {
- LOGGER.debug("POST");
- result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
- } else if(inClass.equals(Page.class)) {
- LOGGER.debug("PAGE");
- result = facebookPageActivitySerializer.deserialize(mapper.convertValue(event, Page.class));
- }
- } else if( outClass.equals( Post.class )) {
- LOGGER.debug("POST");
- result = mapper.convertValue(event, Post.class);
- } else if( outClass.equals(Page.class)) {
- LOGGER.debug("PAGE");
- result = mapper.convertValue(event, Page.class);
- } else if( outClass.equals( ObjectNode.class )) {
- LOGGER.debug("OBJECTNODE");
- result = mapper.convertValue(event, ObjectNode.class);
- }
-
- // no supported conversion were applied
- if( result != null ) {
- count ++;
- return result;
- }
-
- LOGGER.debug("CONVERT FAILED");
-
- return null;
+ // no supported conversion were applied
+ if ( result != null ) {
+ count ++;
+ return result;
}
- public boolean validate(Object document, Class klass) {
-
- // TODO
- return true;
+ LOGGER.debug("CONVERT FAILED");
+
+ return null;
+ }
+
+ // TODO: use standard validation
+ public boolean validate(Object document, Class klass) {
+ return true;
+ }
+
+ // TODO: replace with standard validation
+ public boolean isValidJSON(final String json) {
+ boolean valid = false;
+ try {
+ final JsonParser parser = new ObjectMapper().getJsonFactory()
+ .createJsonParser(json);
+ while (parser.nextToken() != null) {
+ }
+ valid = true;
+ } catch (JsonParseException jpe) {
+ LOGGER.warn("validate: {}", jpe);
+ } catch (IOException ioe) {
+ LOGGER.warn("validate: {}", ioe);
}
- public boolean isValidJSON(final String json) {
- boolean valid = false;
- try {
- final JsonParser parser = new ObjectMapper().getJsonFactory()
- .createJsonParser(json);
- while (parser.nextToken() != null) {
- }
- valid = true;
- } catch (JsonParseException jpe) {
- LOGGER.warn("validate: {}", jpe);
- } catch (IOException ioe) {
- LOGGER.warn("validate: {}", ioe);
- }
-
- return valid;
- }
+ return valid;
+ }
- @Override
- public String getId() {
- return STREAMS_ID;
- }
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
- @Override
- public List<StreamsDatum> process(StreamsDatum entry) {
+ @Override
+ public List<StreamsDatum> process(StreamsDatum entry) {
- StreamsDatum result = null;
+ StreamsDatum result = null;
- try {
- Object item = entry.getDocument();
- ObjectNode node;
+ try {
+ Object item = entry.getDocument();
+ ObjectNode node;
- LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
+ LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
- if( item instanceof String ) {
+ if ( item instanceof String ) {
- // if the target is string, just pass-through
- if( String.class.equals(outClass)) {
- result = entry;
- }
- else {
- // first check for valid json
- node = (ObjectNode)mapper.readTree((String)item);
+ // if the target is string, just pass-through
+ if ( String.class.equals(outClass)) {
+ result = entry;
+ } else {
+ // first check for valid json
+ node = (ObjectNode)mapper.readTree((String)item);
- // since data is coming from outside provider, we don't know what type the events are
- // for now we'll assume post
- Class inClass = FacebookEventClassifier.detectClass((String) item);
+ // since data is coming from outside provider, we don't know what type the events are
+ // for now we'll assume post
+ Class inClass = FacebookEventClassifier.detectClass((String) item);
- Object out = convert(node, inClass, outClass);
+ Object out = convert(node, inClass, outClass);
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
- }
+ if ( out != null && validate(out, outClass)) {
+ result = new StreamsDatum(out);
+ }
+ }
- } else if( item instanceof ObjectNode) {
+ } else if ( item instanceof ObjectNode) {
- // first check for valid json
- node = (ObjectNode)mapper.valueToTree(item);
+ // first check for valid json
+ node = (ObjectNode)mapper.valueToTree(item);
- Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item));
+ Class inClass = FacebookEventClassifier.detectClass(mapper.writeValueAsString(item));
- Object out = convert(node, inClass, outClass);
+ Object out = convert(node, inClass, outClass);
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
- } else if(item instanceof Post || item instanceof Page) {
- Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass);
+ if ( out != null && validate(out, outClass)) {
+ result = new StreamsDatum(out);
+ }
+ } else if (item instanceof Post || item instanceof Page) {
+ Object out = convert(mapper.convertValue(item, ObjectNode.class), inClass, outClass);
- if( out != null && validate(out, outClass))
- result = new StreamsDatum(out);
- }
- } catch (Exception e) {
- LOGGER.error("Exception switching types : {}", e);
- if(e instanceof InterruptedException) {
- Thread.currentThread().interrupt();
- }
+ if ( out != null && validate(out, outClass)) {
+ result = new StreamsDatum(out);
}
+ }
+ } catch (Exception ex) {
+ LOGGER.error("Exception switching types : {}", ex);
+ if (ex instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ }
- if( result != null )
- return Lists.newArrayList(result);
- else
- return Lists.newArrayList();
+ if ( result != null ) {
+ return Lists.newArrayList(result);
+ } else {
+ return Lists.newArrayList();
}
+ }
- @Override
- public void prepare(Object o) {
- mapper = StreamsJacksonMapper.getInstance();
+ @Override
+ public void prepare(Object configurationObject) {
+ mapper = StreamsJacksonMapper.getInstance();
- facebookPageActivitySerializer = new FacebookPageActivitySerializer();
- facebookPostActivitySerializer = new FacebookPostActivitySerializer();
- }
+ facebookPageActivitySerializer = new FacebookPageActivitySerializer();
+ facebookPostActivitySerializer = new FacebookPostActivitySerializer();
+ }
- @Override
- public void cleanUp() {}
+ @Override
+ public void cleanUp() {}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
index 33ee9dc..617bfab 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookDataCollector.java
@@ -15,130 +15,139 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Strings;
-import facebook4j.Facebook;
-import facebook4j.FacebookFactory;
-import facebook4j.conf.ConfigurationBuilder;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.facebook.FacebookConfiguration;
import org.apache.streams.facebook.IdConfig;
import org.apache.streams.util.api.requests.backoff.BackOffStrategy;
import org.apache.streams.util.api.requests.backoff.impl.ExponentialBackOffStrategy;
import org.apache.streams.util.oauth.tokens.tokenmanager.SimpleTokenManager;
-import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManger;
+import org.apache.streams.util.oauth.tokens.tokenmanager.impl.BasicTokenManager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import facebook4j.Facebook;
+import facebook4j.FacebookFactory;
+import facebook4j.conf.ConfigurationBuilder;
+
/**
* Abstract data collector for Facebook. Iterates over ids and queues data to be output
* by a {@link org.apache.streams.core.StreamsProvider}
*/
public abstract class FacebookDataCollector implements Runnable {
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class);
- private static final String READ_ONLY = "read_streams";
-
- @VisibleForTesting
- protected AtomicBoolean isComplete;
- protected BackOffStrategy backOff;
-
- private FacebookConfiguration config;
- private BlockingQueue<StreamsDatum> queue;
- private SimpleTokenManager<String> authTokens;
-
-
- public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
- this.config = config;
- this.queue = queue;
- this.isComplete = new AtomicBoolean(false);
- this.backOff = new ExponentialBackOffStrategy(5);
- this.authTokens = new BasicTokenManger<String>();
- if(config.getUserAccessTokens() != null) {
- for(String token : config.getUserAccessTokens()) {
- this.authTokens.addTokenToPool(token);
- }
- }
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookDataCollector.class);
+ private static final String READ_ONLY = "read_streams";
- /**
- * Returns true when the collector has finished querying facebook and has queued all data
- * for the provider
- * @return
- */
- public boolean isComplete(){
- return this.isComplete.get();
- }
+ @VisibleForTesting
+ protected AtomicBoolean isComplete;
+ protected BackOffStrategy backOff;
+
+ private FacebookConfiguration config;
+ private BlockingQueue<StreamsDatum> queue;
+ private SimpleTokenManager<String> authTokens;
- /**
- * Queues facebook data
- * @param data
- * @param id
- */
- protected void outputData(Object data, String id) {
- try {
- this.queue.put(new StreamsDatum(data, id));
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
+ /**
+ * FacebookDataCollector constructor.
+ * @param config config
+ * @param queue queue
+ */
+ public FacebookDataCollector(FacebookConfiguration config, BlockingQueue<StreamsDatum> queue) {
+ this.config = config;
+ this.queue = queue;
+ this.isComplete = new AtomicBoolean(false);
+ this.backOff = new ExponentialBackOffStrategy(5);
+ this.authTokens = new BasicTokenManager<String>();
+ if (config.getUserAccessTokens() != null) {
+ for (String token : config.getUserAccessTokens()) {
+ this.authTokens.addTokenToPool(token);
+ }
}
+ }
- /**
- * Gets a Facebook client. If multiple authenticated users for this app are available
- * it will rotate through the users oauth credentials
- * @return
- */
- protected Facebook getNextFacebookClient() {
- ConfigurationBuilder cb = new ConfigurationBuilder();
- cb.setDebugEnabled(true);
- cb.setOAuthPermissions(READ_ONLY);
- cb.setOAuthAppId(this.config.getOauth().getAppId());
- cb.setOAuthAppSecret(this.config.getOauth().getAppSecret());
- if(this.authTokens.numAvailableTokens() > 0)
- cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
- else {
- cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
- LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
- }
- cb.setJSONStoreEnabled(true);
- if(!Strings.isNullOrEmpty(config.getVersion()))
- cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/");
- LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
- LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret());
- FacebookFactory ff = new FacebookFactory(cb.build());
- return ff.getInstance();
+ /**
+ * Returns true when the collector has finished querying facebook and has queued all data
+ * for the provider.
+ * @return isComplete
+ */
+ public boolean isComplete() {
+ return this.isComplete.get();
+ }
+
+ /**
+ * Queues facebook data.
+ * @param data data
+ * @param id id
+ */
+ protected void outputData(Object data, String id) {
+ try {
+ this.queue.put(new StreamsDatum(data, id));
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
}
+ }
- /**
- * Queries facebook and queues the resulting data
- * @param id
- * @throws Exception
- */
- protected abstract void getData(IdConfig id) throws Exception;
-
-
- @Override
- public void run() {
- for( IdConfig id : this.config.getIds()) {
- try {
- getData(id);
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOGGER.error("Caught Exception while trying to poll data for page : {}", id);
- LOGGER.error("Exception while getting page feed data: {}", e);
- }
- }
- this.isComplete.set(true);
+ /**
+ * Gets a Facebook client. If multiple authenticated users for this app are available
+ * it will rotate through the users oauth credentials
+ * @return client
+ */
+ protected Facebook getNextFacebookClient() {
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true);
+ cb.setOAuthPermissions(READ_ONLY);
+ cb.setOAuthAppId(this.config.getOauth().getAppId());
+ cb.setOAuthAppSecret(this.config.getOauth().getAppSecret());
+ if (this.authTokens.numAvailableTokens() > 0) {
+ cb.setOAuthAccessToken(this.authTokens.getNextAvailableToken());
+ } else {
+ cb.setOAuthAccessToken(this.config.getOauth().getAppAccessToken());
+ LOGGER.debug("appAccessToken : {}", this.config.getOauth().getAppAccessToken());
+ }
+ cb.setJSONStoreEnabled(true);
+ if (!Strings.isNullOrEmpty(config.getVersion())) {
+ cb.setRestBaseURL("https://graph.facebook.com/" + config.getVersion() + "/");
}
+ LOGGER.debug("appId : {}", this.config.getOauth().getAppId());
+ LOGGER.debug("appSecret: {}", this.config.getOauth().getAppSecret());
+ FacebookFactory ff = new FacebookFactory(cb.build());
+ return ff.getInstance();
+ }
+
+ /**
+ * Queries facebook and queues the resulting data.
+ * @param id id
+ * @throws Exception Exception
+ */
+ protected abstract void getData(IdConfig id) throws Exception;
- @VisibleForTesting
- protected BlockingQueue<StreamsDatum> getQueue() {
- return queue;
+
+ @Override
+ public void run() {
+ for ( IdConfig id : this.config.getIds()) {
+ try {
+ getData(id);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ } catch (Exception ex) {
+ LOGGER.error("Caught Exception while trying to poll data for page : {}", id);
+ LOGGER.error("Exception while getting page feed data: {}", ex);
+ }
}
+ this.isComplete.set(true);
+ }
+
+ @VisibleForTesting
+ protected BlockingQueue<StreamsDatum> getQueue() {
+ return queue;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
index 16e2a25..47c2afb 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookEventClassifier.java
@@ -18,40 +18,50 @@
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-
-import java.io.IOException;
-
import org.apache.streams.facebook.Page;
import org.apache.streams.facebook.Post;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
+/**
+ * FacebookEventClassifier classifies facebook events.
+ */
public class FacebookEventClassifier {
- private final static Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookEventClassifier.class);
- public static Class detectClass( String json ) {
+ /**
+ * detectClass from json string.
+ * @param json json string
+ * @return detected Class
+ */
+ public static Class detectClass( String json ) {
- Preconditions.checkNotNull(json);
- Preconditions.checkArgument(StringUtils.isNotEmpty(json));
+ Preconditions.checkNotNull(json);
+ Preconditions.checkArgument(StringUtils.isNotEmpty(json));
- ObjectNode objectNode;
- try {
- objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json);
- } catch (IOException e) {
- LOGGER.error("Exception while trying to detect class: {}", e.getMessage());
- return null;
- }
+ ObjectNode objectNode;
+ try {
+ objectNode = (ObjectNode) StreamsJacksonMapper.getInstance().readTree(json);
+ } catch (IOException ex) {
+ LOGGER.error("Exception while trying to detect class: {}", ex.getMessage());
+ return null;
+ }
- if( objectNode.findValue("about") != null)
- return Page.class;
- else if( objectNode.findValue("statusType") != null )
- return Post.class;
- else
- return Post.class;
+ if ( objectNode.findValue("about") != null) {
+ return Page.class;
+ } else if ( objectNode.findValue("statusType") != null ) {
+ return Post.class;
+ } else {
+ return Post.class;
}
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
index 231ee4f..3253479 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendFeedProvider.java
@@ -18,15 +18,6 @@
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -36,253 +27,295 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
import org.apache.streams.facebook.FacebookUserstreamConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Iterator;
import java.util.Queue;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
-{
-
- public static final String STREAMS_ID = "FacebookFriendFeedProvider";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
-
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
- private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
- private FacebookUserstreamConfiguration configuration;
+public class FacebookFriendFeedProvider implements StreamsProvider, Serializable {
- private Class klass;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+ public static final String STREAMS_ID = "FacebookFriendFeedProvider";
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
- public FacebookUserstreamConfiguration getConfig() { return configuration; }
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; }
+ private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,
user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
+ private FacebookUserstreamConfiguration configuration;
- protected Iterator<String[]> idsBatches;
+ private Class klass;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected ExecutorService executor;
+ protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
- protected DateTime start;
- protected DateTime end;
+ public FacebookUserstreamConfiguration getConfig() {
+ return configuration;
+ }
- protected final AtomicBoolean running = new AtomicBoolean();
+ public void setConfig(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
- private DatumStatusCounter countersCurrent = new DatumStatusCounter();
- private DatumStatusCounter countersTotal = new DatumStatusCounter();
+ protected Iterator<String[]> idsBatches;
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ protected ExecutorService executor;
- public FacebookFriendFeedProvider() {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration configuration;
- try {
- configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- }
+ protected DateTime start;
+ protected DateTime end;
- public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
- this.configuration = config;
- }
+ protected final AtomicBoolean running = new AtomicBoolean();
- public FacebookFriendFeedProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration configuration;
- try {
- configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- this.klass = klass;
- }
+ private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private DatumStatusCounter countersTotal = new DatumStatusCounter();
- public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
- this.configuration = config;
- this.klass = klass;
- }
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+ return new ThreadPoolExecutor(numThreads, numThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
+ /**
+ * FacebookFriendFeedProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+ */
+ public FacebookFriendFeedProvider() {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration configuration;
+ try {
+ configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- @Override
- public String getId() {
- return STREAMS_ID;
+ }
+
+ /**
+ * FacebookFriendFeedProvider constructor - uses supplied FacebookUserInformationConfiguration.
+ */
+ public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
+
+ /**
+ * FacebookFriendFeedProvider constructor - output supplied Class.
+ * @param klass Class
+ */
+ public FacebookFriendFeedProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration configuration;
+ try {
+ configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- @Override
- public void startStream() {
- shutdownAndAwaitTermination(executor);
- running.set(true);
+ this.klass = klass;
+ }
+
+ public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
+ this.configuration = config;
+ this.klass = klass;
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ shutdownAndAwaitTermination(executor);
+ running.set(true);
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ StreamsResultSet current;
+
+ synchronized (FacebookUserstreamProvider.class) {
+ current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
+ current.setCounter(new DatumStatusCounter());
+ current.getCounter().add(countersCurrent);
+ countersTotal.add(countersCurrent);
+ countersCurrent = new DatumStatusCounter();
+ providerQueue.clear();
}
- public StreamsResultSet readCurrent() {
-
- StreamsResultSet current;
-
- synchronized (FacebookUserstreamProvider.class) {
- current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
- current.setCounter(new DatumStatusCounter());
- current.getCounter().add(countersCurrent);
- countersTotal.add(countersCurrent);
- countersCurrent = new DatumStatusCounter();
- providerQueue.clear();
+ return current;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ this.start = start;
+ this.end = end;
+ readCurrent();
+ StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+ return result;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
}
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
- return current;
+ @Override
+ public void prepare(Object configurationObject) {
- }
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(this.klass);
+ Preconditions.checkNotNull(configuration.getOauth().getAppId());
+ Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+ Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- this.start = start;
- this.end = end;
- readCurrent();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
+ Facebook client = getFacebookClient();
- @Override
- public boolean isRunning() {
- return running.get();
- }
+ try {
+ ResponseList<Friend> friendResponseList = client.friends().getFriends();
+ Paging<Friend> friendPaging;
+ do {
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
+ for ( Friend friend : friendResponseList ) {
+ executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
}
+ friendPaging = friendResponseList.getPaging();
+ friendResponseList = client.fetchNext(friendPaging);
+ }
+ while ( friendPaging != null
+ &&
+ friendResponseList != null );
+ } catch (FacebookException ex) {
+ ex.printStackTrace();
}
- @Override
- public void prepare(Object o) {
+ }
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ protected Facebook getFacebookClient() {
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(this.klass);
- Preconditions.checkNotNull(configuration.getOauth().getAppId());
- Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
- Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true)
+ .setOAuthAppId(configuration.getOauth().getAppId())
+ .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+ .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+ .setOAuthPermissions(ALL_PERMISSIONS)
+ .setJSONStoreEnabled(true)
+ .setClientVersion("v1.0");
- Facebook client = getFacebookClient();
+ FacebookFactory ff = new FacebookFactory(cb.build());
+ Facebook facebook = ff.getInstance();
- try {
- ResponseList<Friend> friendResponseList = client.friends().getFriends();
- Paging<Friend> friendPaging;
- do {
+ return facebook;
+ }
- for( Friend friend : friendResponseList ) {
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
- executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
- }
- friendPaging = friendResponseList.getPaging();
- friendResponseList = client.fetchNext(friendPaging);
- } while( friendPaging != null &&
- friendResponseList != null );
- } catch (FacebookException e) {
- e.printStackTrace();
- }
+ private class FacebookFriendFeedTask implements Runnable {
- }
+ FacebookFriendFeedProvider provider;
+ Facebook client;
+ String id;
- protected Facebook getFacebookClient()
- {
- ConfigurationBuilder cb = new ConfigurationBuilder();
- cb.setDebugEnabled(true)
- .setOAuthAppId(configuration.getOauth().getAppId())
- .setOAuthAppSecret(configuration.getOauth().getAppSecret())
- .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
- .setOAuthPermissions(ALL_PERMISSIONS)
- .setJSONStoreEnabled(true)
- .setClientVersion("v1.0");
-
- FacebookFactory ff = new FacebookFactory(cb.build());
- Facebook facebook = ff.getInstance();
-
- return facebook;
+ public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
+ this.provider = provider;
+ this.id = id;
}
@Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
-
- private class FacebookFriendFeedTask implements Runnable {
-
- FacebookFriendFeedProvider provider;
- Facebook client;
- String id;
-
- public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
- this.provider = provider;
- this.id = id;
+ public void run() {
+ client = provider.getFacebookClient();
+ try {
+ ResponseList<Post> postResponseList = client.getFeed(id);
+ Paging<Post> postPaging;
+ do {
+
+ for (Post item : postResponseList) {
+ String json = DataObjectFactory.getRawJSON(item);
+ org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+ countersCurrent.incrementAttempt();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+ postPaging = postResponseList.getPaging();
+ postResponseList = client.fetchNext(postPaging);
}
+ while ( postPaging != null
+ &&
+ postResponseList != null );
- @Override
- public void run() {
- client = provider.getFacebookClient();
- try {
- ResponseList<Post> postResponseList = client.getFeed(id);
- Paging<Post> postPaging;
- do {
-
- for (Post item : postResponseList) {
- String json = DataObjectFactory.getRawJSON(item);
- org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
- countersCurrent.incrementAttempt();
- } finally {
- lock.readLock().unlock();
- }
- }
- postPaging = postResponseList.getPaging();
- postResponseList = client.fetchNext(postPaging);
- } while( postPaging != null &&
- postResponseList != null );
-
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
}
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
index cda868e..50ac64a 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/FacebookFriendUpdatesProvider.java
@@ -18,15 +18,6 @@
package org.apache.streams.facebook.provider;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
@@ -36,10 +27,18 @@ import org.apache.streams.facebook.FacebookUserInformationConfiguration;
import org.apache.streams.facebook.FacebookUserstreamConfiguration;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.commons.lang.NotImplementedException;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.lang.NotImplementedException;
import java.io.IOException;
import java.io.Serializable;
@@ -47,246 +46,290 @@ import java.math.BigInteger;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
-import java.util.concurrent.*;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable
-{
-
- public static final String STREAMS_ID = "FacebookFriendPostsProvider";
+import facebook4j.Facebook;
+import facebook4j.FacebookException;
+import facebook4j.FacebookFactory;
+import facebook4j.Friend;
+import facebook4j.Paging;
+import facebook4j.Post;
+import facebook4j.ResponseList;
+import facebook4j.conf.ConfigurationBuilder;
+import facebook4j.json.DataObjectFactory;
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
+/**
+ * FacebookFriendUpdatesProvider provides updates from friend feed.
+ */
+public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable {
- private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ public static final String STREAMS_ID = "FacebookFriendPostsProvider";
- private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
- private FacebookUserstreamConfiguration configuration;
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
- private Class klass;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
+ private static final String ALL_PERMISSIONS =
+ "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activities,user_birthday,user_education_history,user_
events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
- public FacebookUserstreamConfiguration getConfig() { return configuration; }
+ private FacebookUserstreamConfiguration configuration;
- public void setConfig(FacebookUserstreamConfiguration config) { this.configuration = config; }
+ private Class klass;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- protected Iterator<String[]> idsBatches;
+ protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
- protected ExecutorService executor;
+ public FacebookUserstreamConfiguration getConfig() {
+ return configuration;
+ }
- protected DateTime start;
- protected DateTime end;
+ public void setConfig(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
- protected final AtomicBoolean running = new AtomicBoolean();
+ protected Iterator<String[]> idsBatches;
- private DatumStatusCounter countersCurrent = new DatumStatusCounter();
- private DatumStatusCounter countersTotal = new DatumStatusCounter();
+ protected ExecutorService executor;
- private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
- return new ThreadPoolExecutor(nThreads, nThreads,
- 5000L, TimeUnit.MILLISECONDS,
- new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
- }
+ protected DateTime start;
+ protected DateTime end;
- public FacebookFriendUpdatesProvider() {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration configuration;
- try {
- configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- }
+ protected final AtomicBoolean running = new AtomicBoolean();
- public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
- this.configuration = config;
- }
+ private DatumStatusCounter countersCurrent = new DatumStatusCounter();
+ private DatumStatusCounter countersTotal = new DatumStatusCounter();
- public FacebookFriendUpdatesProvider(Class klass) {
- Config config = StreamsConfigurator.config.getConfig("facebook");
- FacebookUserInformationConfiguration configuration;
- try {
- configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
- } catch (IOException e) {
- e.printStackTrace();
- return;
- }
- this.klass = klass;
- }
+ // TODO: factor this out.
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
+ return new ThreadPoolExecutor(numThreads, numThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
- public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
- this.configuration = config;
- this.klass = klass;
+ /**
+ * FacebookFriendUpdatesProvider constructor - resolves FacebookUserInformationConfiguration from JVM 'facebook'.
+ */
+ public FacebookFriendUpdatesProvider() {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration configuration;
+ try {
+ configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- public Queue<StreamsDatum> getProviderQueue() {
- return this.providerQueue;
+ }
+
+ /**
+ * FacebookFriendUpdatesProvider constructor - uses supplied FacebookUserstreamConfiguration.
+ */
+ public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
+ this.configuration = config;
+ }
+
+ /**
+ * FacebookFriendUpdatesProvider constructor.
+ * uses supplied output Class.
+ * resolves FacebookUserInformationConfiguration from JVM 'facebook.
+ */
+ public FacebookFriendUpdatesProvider(Class klass) {
+ Config config = StreamsConfigurator.config.getConfig("facebook");
+ FacebookUserInformationConfiguration configuration;
+ try {
+ configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
+ } catch (IOException ex) {
+ ex.printStackTrace();
+ return;
}
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- @Override
- public void startStream() {
- running.set(true);
+ this.klass = klass;
+ }
+
+ /**
+ * FacebookFriendUpdatesProvider constructor.
+ * uses supplied FacebookUserstreamConfiguration.
+ * uses supplied output Class.
+ */
+ public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
+ this.configuration = config;
+ this.klass = klass;
+ }
+
+ public Queue<StreamsDatum> getProviderQueue() {
+ return this.providerQueue;
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ @Override
+ public void startStream() {
+ running.set(true);
+ }
+
+ @Override
+ public StreamsResultSet readCurrent() {
+
+ Preconditions.checkArgument(idsBatches.hasNext());
+
+ LOGGER.info("readCurrent");
+
+ // return stuff
+
+ LOGGER.info("Finished. Cleaning up...");
+
+ LOGGER.info("Providing {} docs", providerQueue.size());
+
+ StreamsResultSet result = new StreamsResultSet(providerQueue);
+ running.set(false);
+
+ LOGGER.info("Exiting");
+
+ return result;
+
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ LOGGER.debug("{} readNew", STREAMS_ID);
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ LOGGER.debug("{} readRange", STREAMS_ID);
+ this.start = start;
+ this.end = end;
+ readCurrent();
+ StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
+ return result;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return running.get();
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ System.err.println("Pool did not terminate");
+ }
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
}
+ }
- public StreamsResultSet readCurrent() {
-
- Preconditions.checkArgument(idsBatches.hasNext());
-
- LOGGER.info("readCurrent");
-
- // return stuff
+ @Override
+ public void prepare(Object configurationObject) {
- LOGGER.info("Finished. Cleaning up...");
+ executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
- LOGGER.info("Providing {} docs", providerQueue.size());
+ Preconditions.checkNotNull(providerQueue);
+ Preconditions.checkNotNull(this.klass);
+ Preconditions.checkNotNull(configuration.getOauth().getAppId());
+ Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
+ Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
- StreamsResultSet result = new StreamsResultSet(providerQueue);
- running.set(false);
+ Facebook client = getFacebookClient();
- LOGGER.info("Exiting");
-
- return result;
+ try {
+ ResponseList<Friend> friendResponseList = client.friends().getFriends();
+ Paging<Friend> friendPaging;
+ do {
+ for ( Friend friend : friendResponseList ) {
+ // client.rawAPI().callPostAPI();
+ // add a subscription
+ }
+ friendPaging = friendResponseList.getPaging();
+ friendResponseList = client.fetchNext(friendPaging);
+ }
+ while ( friendPaging != null
+ &&
+ friendResponseList != null );
+ } catch (FacebookException ex) {
+ ex.printStackTrace();
}
- public StreamsResultSet readNew(BigInteger sequence) {
- LOGGER.debug("{} readNew", STREAMS_ID);
- throw new NotImplementedException();
- }
+ }
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- LOGGER.debug("{} readRange", STREAMS_ID);
- this.start = start;
- this.end = end;
- readCurrent();
- StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
- return result;
- }
+ protected Facebook getFacebookClient() {
- @Override
- public boolean isRunning() {
- return running.get();
- }
+ ConfigurationBuilder cb = new ConfigurationBuilder();
+ cb.setDebugEnabled(true)
+ .setOAuthAppId(configuration.getOauth().getAppId())
+ .setOAuthAppSecret(configuration.getOauth().getAppSecret())
+ .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
+ .setOAuthPermissions(ALL_PERMISSIONS)
+ .setJSONStoreEnabled(true)
+ .setClientVersion("v1.0");
- void shutdownAndAwaitTermination(ExecutorService pool) {
- pool.shutdown(); // Disable new tasks from being submitted
- try {
- // Wait a while for existing tasks to terminate
- if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
- pool.shutdownNow(); // Cancel currently executing tasks
- // Wait a while for tasks to respond to being cancelled
- if (!pool.awaitTermination(10, TimeUnit.SECONDS))
- System.err.println("Pool did not terminate");
- }
- } catch (InterruptedException ie) {
- // (Re-)Cancel if current thread also interrupted
- pool.shutdownNow();
- // Preserve interrupt status
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public void prepare(Object o) {
+ FacebookFactory ff = new FacebookFactory(cb.build());
+ Facebook facebook = ff.getInstance();
- executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+ return facebook;
+ }
- Preconditions.checkNotNull(providerQueue);
- Preconditions.checkNotNull(this.klass);
- Preconditions.checkNotNull(configuration.getOauth().getAppId());
- Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
- Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
+ @Override
+ public void cleanUp() {
+ shutdownAndAwaitTermination(executor);
+ }
- Facebook client = getFacebookClient();
+ private class FacebookFeedPollingTask implements Runnable {
- try {
- ResponseList<Friend> friendResponseList = client.friends().getFriends();
- Paging<Friend> friendPaging;
- do {
-
- for( Friend friend : friendResponseList ) {
-
- //client.rawAPI().callPostAPI();
- // add a subscription
- }
- friendPaging = friendResponseList.getPaging();
- friendResponseList = client.fetchNext(friendPaging);
- } while( friendPaging != null &&
- friendResponseList != null );
- } catch (FacebookException e) {
- e.printStackTrace();
- }
+ FacebookUserstreamProvider provider;
+ Facebook client;
- }
+ private Set<Post> priorPollResult = Sets.newHashSet();
- protected Facebook getFacebookClient()
- {
- ConfigurationBuilder cb = new ConfigurationBuilder();
- cb.setDebugEnabled(true)
- .setOAuthAppId(configuration.getOauth().getAppId())
- .setOAuthAppSecret(configuration.getOauth().getAppSecret())
- .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
- .setOAuthPermissions(ALL_PERMISSIONS)
- .setJSONStoreEnabled(true)
- .setClientVersion("v1.0");
-
- FacebookFactory ff = new FacebookFactory(cb.build());
- Facebook facebook = ff.getInstance();
-
- return facebook;
+ public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
+ provider = facebookUserstreamProvider;
}
@Override
- public void cleanUp() {
- shutdownAndAwaitTermination(executor);
- }
-
- private class FacebookFeedPollingTask implements Runnable {
-
- FacebookUserstreamProvider provider;
- Facebook client;
-
- private Set<Post> priorPollResult = Sets.newHashSet();
-
- public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
- provider = facebookUserstreamProvider;
- }
-
- @Override
- public void run() {
- client = provider.getFacebookClient();
- while (provider.isRunning()) {
- try {
- ResponseList<Post> postResponseList = client.getHome();
- Set<Post> update = Sets.newHashSet(postResponseList);
- Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
- Set<Post> entrySet = Sets.difference(update, repeats);
- for (Post item : entrySet) {
- String json = DataObjectFactory.getRawJSON(item);
- org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
- try {
- lock.readLock().lock();
- ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
- countersCurrent.incrementAttempt();
- } finally {
- lock.readLock().unlock();
- }
- }
- priorPollResult = update;
- Thread.sleep(configuration.getPollIntervalMillis());
- } catch (Exception e) {
- e.printStackTrace();
- }
+ public void run() {
+ client = provider.getFacebookClient();
+ while (provider.isRunning()) {
+ try {
+ ResponseList<Post> postResponseList = client.getHome();
+ Set<Post> update = Sets.newHashSet(postResponseList);
+ Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
+ Set<Post> entrySet = Sets.difference(update, repeats);
+ for (Post item : entrySet) {
+ String json = DataObjectFactory.getRawJSON(item);
+ org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
+ try {
+ lock.readLock().lock();
+ ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
+ countersCurrent.incrementAttempt();
+ } finally {
+ lock.readLock().unlock();
}
+ }
+ priorPollResult = update;
+ Thread.sleep(configuration.getPollIntervalMillis());
+ } catch (Exception ex) {
+ ex.printStackTrace();
}
+ }
}
+ }
}