You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/16 01:46:35 UTC
[1/2] incubator-streams git commit: STREAMS-456: Facebook page
provider does not include many fields
Repository: incubator-streams
Updated Branches:
refs/heads/master 7d9b887ed -> f597411a0
STREAMS-456: Facebook page provider does not include many fields
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1799fbfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1799fbfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1799fbfc
Branch: refs/heads/master
Commit: 1799fbfc7f64db653a97075e6ba8990e7c97eaa2
Parents: 7d9b887
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Nov 14 23:26:18 2016 +0100
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Nov 14 23:26:18 2016 +0100
----------------------------------------------------------------------
.../streams-provider-facebook/pom.xml | 2 +-
.../page/FacebookPageDataCollector.java | 91 +++++-----
.../provider/page/FacebookPageProvider.java | 165 +++++++++++--------
.../FacebookPageProviderConfiguration.json | 20 +++
.../src/main/resources/reference.conf | 119 ++++++++++++-
.../facebook/test/data/FacebookPostSerDeIT.java | 90 +++++-----
6 files changed, 328 insertions(+), 159 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml
index 7db5fde..3d21859 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -87,7 +87,7 @@
<dependency>
<groupId>org.facebook4j</groupId>
<artifactId>facebook4j-core</artifactId>
- <version>2.4.7</version>
+ <version>2.4.8</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index b78422b..ca68a7e 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -15,69 +15,80 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider.page;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import facebook4j.*;
-import facebook4j.json.DataObjectFactory;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.FacebookPageProviderConfiguration;
import org.apache.streams.facebook.IdConfig;
import org.apache.streams.facebook.provider.FacebookDataCollector;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
+import facebook4j.FacebookException;
+import facebook4j.Page;
+import facebook4j.Reading;
+import facebook4j.json.DataObjectFactory;
+
/**
* Collects the page data from public Facebook pages
*/
public class FacebookPageDataCollector extends FacebookDataCollector {
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageDataCollector.class);
- private static final int MAX_ATTEMPTS = 5;
- private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageDataCollector.class);
+ private static final int MAX_ATTEMPTS = 5;
+ private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
- public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
- super(configuration, queue);
- }
+ private String fields;
- @Override
- protected void getData(IdConfig id) throws Exception {
- Page responsePage = getPage(id.getId());
- backOff.reset();
+ public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
+ super(configuration, queue);
+ fields = Joiner.on(',').join(configuration.getFields());
+ }
- if(responsePage != null) {
- super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(responsePage), org.apache.streams.facebook.Page.class), responsePage.getId());
- }
+ @Override
+ protected void getData(IdConfig id) throws Exception {
+ Page responsePage = getPage(id.getId());
+ backOff.reset();
+
+ if (responsePage != null) {
+ super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(responsePage), org.apache.streams.facebook.Page.class), responsePage.getId());
}
+ }
+
+ protected Page getPage(String pageId) throws Exception {
+ int attempt = 0;
+ while (attempt < MAX_ATTEMPTS) {
+ ++attempt;
+ try {
+ Page page = getNextFacebookClient().getPage(pageId, new Reading().fields(fields));
+ return page;
+ } catch (FacebookException fe) {
+ LOGGER.error("Facebook returned an exception : {}", fe);
+ LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
- protected Page getPage(String pageId) throws Exception {
- int attempt = 0;
- while(attempt < MAX_ATTEMPTS) {
- ++attempt;
- try {
- Page page = getNextFacebookClient().getPage(pageId);
- return page;
- } catch (FacebookException fe) {
- LOGGER.error("Facebook returned an exception : {}", fe);
- LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
-
- int errorCode = fe.getErrorCode();
-
- //Some sort of rate limiting
- if(errorCode == 17 || errorCode == 4 || errorCode == 341) {
- super.backOff.backOff();
- }
- }
+ int errorCode = fe.getErrorCode();
+
+ //Some sort of rate limiting
+ if (errorCode == 17 || errorCode == 4 || errorCode == 341) {
+ super.backOff.backOff();
}
- throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS);
+ }
}
+ throw new Exception("Failed to get data from facebook after " + MAX_ATTEMPTS);
+ }
- @VisibleForTesting
- protected BlockingQueue<StreamsDatum> getQueue() {
- return super.getQueue();
- }
+ @VisibleForTesting
+ protected BlockingQueue<StreamsDatum> getQueue() {
+ return super.getQueue();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index dea17e1..cd74927 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -15,24 +15,29 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.streams.facebook.provider.page;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.FacebookPageProviderConfiguration;
import org.apache.streams.facebook.provider.FacebookDataCollector;
import org.apache.streams.facebook.provider.FacebookProvider;
import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,77 +53,91 @@ import java.util.concurrent.TimeUnit;
* Streams Provider which collects Page Profiles in the ID List contained in the
* FacebookConfiguration object
*
- * To use from command line:
+ * <p/>
+ * To use from command line,
*
- * Supply (at least) the following required configuration in application.conf:
+ * <p/>
+ * Launch using:
*
- * facebook.oauth.appId
- * facebook.oauth.appSecret
- * facebook.oauth.userAccessToken
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.facebook.provider.page.FacebookPageProvider -Dexec.args="application.conf pages.json"
*
- * Launch using:
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
*
- * mvn exec:java -Dexec.mainClass=org.apache.streams.facebook.provider.page.FacebookPageProvider -Dexec.args="application.conf pages.json"
-
+ * <p>
+ * facebook.oauth.appId
+ * facebook.oauth.appSecret
+ * facebook.oauth.userAccessToken
+ * </p>
*/
public class FacebookPageProvider extends FacebookProvider {
- public static final String STREAMS_ID = "FacebookPageProvider";
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
-
- private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
- public FacebookPageProvider(FacebookConfiguration facebookConfiguration) {
- super(facebookConfiguration);
- }
-
- @VisibleForTesting
- BlockingQueue<StreamsDatum> getQueue() {
- return super.datums;
- }
-
- @Override
- protected FacebookDataCollector getDataCollector() {
- return new FacebookPageDataCollector(super.datums, super.configuration);
- }
-
- public static void main(String[] args) throws Exception {
-
- Preconditions.checkArgument(args.length >= 2);
-
- String configfile = args[0];
- String outfile = args[1];
-
- Config reference = ConfigFactory.load();
- File conf_file = new File(configfile);
- assert(conf_file.exists());
- Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
- Config typesafe = conf.withFallback(reference).resolve();
-
- StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
- FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
- FacebookPageProvider provider = new FacebookPageProvider(config);
-
- PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
- provider.prepare(config);
- provider.startStream();
- do {
- Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
- Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
- while(iterator.hasNext()) {
- StreamsDatum datum = iterator.next();
- String json;
- try {
- json = MAPPER.writeValueAsString(datum.getDocument());
- outStream.println(json);
- } catch (JsonProcessingException e) {
- System.err.println(e.getMessage());
- }
- }
- } while( provider.isRunning());
- provider.cleanUp();
- outStream.flush();
+ public static final String STREAMS_ID = "FacebookPageProvider";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
+
+ private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+ private FacebookPageProviderConfiguration configuration;
+
+ public FacebookPageProvider(FacebookPageProviderConfiguration facebookConfiguration) {
+ super(facebookConfiguration);
+ configuration = facebookConfiguration;
+ }
+
+ @VisibleForTesting
+ BlockingQueue<StreamsDatum> getQueue() {
+ return super.datums;
+ }
+
+ @Override
+ protected FacebookDataCollector getDataCollector() {
+ return new FacebookPageDataCollector(super.datums, configuration);
+ }
+
+ /**
+ * Run FacebookPageProvider from command line.
+ * @param args configfile outfile
+ * @throws Exception Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File confFile = new File(configfile);
+ assert (confFile.exists());
+ Config conf = ConfigFactory.parseFileAnySyntax(confFile, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = conf.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ FacebookPageProviderConfiguration config = new ComponentConfigurator<>(FacebookPageProviderConfiguration.class).detectConfiguration(typesafe, "facebook");
+ FacebookPageProvider provider = new FacebookPageProvider(config);
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while (iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = MAPPER.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException ex) {
+ System.err.println(ex.getMessage());
+ }
+ }
}
+ while ( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
new file mode 100644
index 0000000..8450cff
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
@@ -0,0 +1,20 @@
+{
+ "$schema": "http://json-schema.org/draft-03/schema",
+ "$license": [
+ "http://www.apache.org/licenses/LICENSE-2.0"
+ ],
+ "id": "#",
+ "type": "object",
+ "javaType" : "org.apache.streams.facebook.FacebookPageProviderConfiguration",
+ "javaInterfaces": ["java.io.Serializable"],
+ "extends": {"$ref":"FacebookConfiguration.json"},
+ "properties": {
+ "fields": {
+ "type": "array",
+ "description": "A list of user IDs, indicating users of interest",
+ "items": {
+ "type": "string"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
index d57da0b..eda1404 100644
--- a/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
@@ -14,4 +14,121 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
-facebook.version = "v2.8"
\ No newline at end of file
+facebook.version = "v2.8"
+facebook.fields = [
+ "id"
+ "about"
+ "access_token"
+// "ad_campaign"
+ "affiliation"
+ "app_id"
+ "app_links"
+ "artists_we_like"
+ "attire"
+ "awards"
+ "band_interests"
+ "band_members"
+ "best_page"
+ "bio"
+ "birthday"
+ "booking_agent"
+ "built"
+ "can_checkin"
+ "can_post"
+ "category"
+ "category_list"
+ "checkins"
+ "company_overview"
+ "contact_address"
+ "context"
+ "country_page_likes"
+ "cover"
+ "culinary_team"
+ "current_location"
+ "description"
+ "description_html"
+ "directed_by"
+ "display_subtext"
+ "displayed_message_response_time"
+ "emails"
+ "engagement"
+ "fan_count"
+ "featured_video"
+ "features"
+ "food_styles"
+ "founded"
+ "general_info"
+ "general_manager"
+ "genre"
+ "global_brand_page_name"
+ "global_brand_root_id"
+ "has_added_app"
+ "hometown"
+ "hours"
+ "impressum"
+ "influences"
+// "instant_articles_review_status"
+ "is_always_open"
+ "is_community_page"
+ "is_permanently_closed"
+ "is_published"
+ "is_unclaimed"
+ "is_verified"
+ "is_webhooks_subscribed"
+ "leadgen_tos_accepted"
+ "link"
+ "location"
+ "members"
+ "mission"
+ "mpg"
+ "name"
+ "name_with_location_descriptor"
+ "network"
+ "new_like_count"
+ "offer_eligible"
+ "overall_star_rating"
+ "parent_page"
+ "parking"
+ "payment_options"
+ "personal_info"
+ "personal_interests"
+ "pharma_safety_info"
+ "phone"
+ "place_type"
+ "plot_outline"
+// "preferred_audience"
+ "press_contact"
+ "price_range"
+ "produced_by"
+ "products"
+// "promotion_eligible"
+// "promotion_ineligible_reason"
+ "public_transit"
+ "publisher_space"
+ "rating_count"
+// "recipient"
+ "record_label"
+ "release_date"
+ "restaurant_services"
+ "restaurant_specialties"
+ "schedule"
+ "screenplay_by"
+ "season"
+ "single_line_address"
+ "starring"
+// "start_info"
+ "store_location_descriptor"
+ "store_number"
+ "studio"
+// "supports_instant_articles"
+ "talking_about_count"
+ "unread_message_count"
+ "unread_notif_count"
+ "unseen_message_count"
+ "username"
+ "verification_status"
+ "voip_info"
+ "website"
+ "were_here_count"
+ "written_by"
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
index f877d56..1c9a620 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
@@ -18,15 +18,17 @@
package org.apache.streams.facebook.test.data;
+import org.apache.streams.facebook.Post;
+import org.apache.streams.facebook.serializer.FacebookActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
+
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.facebook.serializer.FacebookActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -38,60 +40,60 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
/**
- * Tests serialization of Facebook Post inputs
+ * Tests serialization of Facebook Post inputs.
*/
public class FacebookPostSerDeIT {
- private final static Logger LOGGER = LoggerFactory.getLogger(FacebookPostSerDeIT.class);
- private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+ private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPostSerDeIT.class);
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
- @Test
- public void Tests()
- {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ @Test
+ public void Tests()
+ {
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- InputStream is = FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
- Joiner joiner = Joiner.on(" ").skipNulls();
- is = new BoundedInputStream(is, 10000);
- String json;
+ InputStream is = FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
+ Joiner joiner = Joiner.on(" ").skipNulls();
+ is = new BoundedInputStream(is, 10000);
+ String json;
- try {
- json = joiner.join(IOUtils.readLines(is));
- LOGGER.debug(json);
+ try {
+ json = joiner.join(IOUtils.readLines(is));
+ LOGGER.debug(json);
- Post ser = mapper.readValue(json, Post.class);
+ Post ser = mapper.readValue(json, Post.class);
- String de = mapper.writeValueAsString(ser);
+ String de = mapper.writeValueAsString(ser);
- LOGGER.debug(de);
+ LOGGER.debug(de);
- Post serde = mapper.readValue(de, Post.class);
+ Post serde = mapper.readValue(de, Post.class);
- Assert.assertEquals(ser, serde);
+ assertEquals(ser, serde);
- LOGGER.debug(mapper.writeValueAsString(serde));
+ LOGGER.debug(mapper.writeValueAsString(serde));
- Activity activity = new Activity();
- FacebookActivityUtil.updateActivity(ser, activity);
+ Activity activity = new Activity();
+ FacebookActivityUtil.updateActivity(ser, activity);
- assertNotNull(activity);
- assertNotNull(activity.getActor().getId());
- assertNotNull(activity.getActor().getDisplayName());
- assertNotNull(activity.getId());
- assert(activity.getVerb().equals("post"));
- assertNotNull(activity.getObject());
- assertNotNull(activity.getUpdated());
- assertNotNull(activity.getPublished());
- assertEquals(activity.getProvider().getId(), "id:providers:facebook");
- assertEquals(activity.getProvider().getDisplayName(), "Facebook");
- assertEquals(activity.getLinks().size(), 1);
- assertNotNull(activity.getAdditionalProperties().get("facebook"));
+ assertNotNull(activity);
+ assertNotNull(activity.getActor().getId());
+ assertNotNull(activity.getActor().getDisplayName());
+ assertNotNull(activity.getId());
+ assert(activity.getVerb().equals("post"));
+ assertNotNull(activity.getObject());
+ assertNotNull(activity.getUpdated());
+ assertNotNull(activity.getPublished());
+ assertEquals(activity.getProvider().getId(), "id:providers:facebook");
+ assertEquals(activity.getProvider().getDisplayName(), "Facebook");
+ assertEquals(activity.getLinks().size(), 1);
+ assertNotNull(activity.getAdditionalProperties().get("facebook"));
- } catch( Exception e ) {
- LOGGER.error("Exception: ", e);
- Assert.fail();
- }
+ } catch( Exception e ) {
+ LOGGER.error("Exception: ", e);
+ Assert.fail();
}
+ }
}
[2/2] incubator-streams git commit: STREAMS-456: PR feedback
Posted by sb...@apache.org.
STREAMS-456: PR feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f597411a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f597411a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f597411a
Branch: refs/heads/master
Commit: f597411a01458a32b3c4e9fad8311de3180b4b41
Parents: 1799fbf
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Nov 16 02:04:04 2016 +0100
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Nov 16 02:04:04 2016 +0100
----------------------------------------------------------------------
.../facebook/provider/page/FacebookPageDataCollector.java | 4 ++--
.../streams/facebook/provider/page/FacebookPageProvider.java | 1 -
2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f597411a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index ca68a7e..0e88dd4 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -27,8 +27,8 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
super(configuration, queue);
- fields = Joiner.on(',').join(configuration.getFields());
+ fields = StringUtils.join(configuration.getFields(), ',');
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f597411a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index cd74927..d11a486 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -31,7 +31,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.typesafe.config.Config;