You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/11/16 01:46:35 UTC

[1/2] incubator-streams git commit: STREAMS-456: Facebook page provider does not include many fields

Repository: incubator-streams
Updated Branches:
  refs/heads/master 7d9b887ed -> f597411a0


STREAMS-456: Facebook page provider does not include many fields


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/1799fbfc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/1799fbfc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/1799fbfc

Branch: refs/heads/master
Commit: 1799fbfc7f64db653a97075e6ba8990e7c97eaa2
Parents: 7d9b887
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Mon Nov 14 23:26:18 2016 +0100
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Mon Nov 14 23:26:18 2016 +0100

----------------------------------------------------------------------
 .../streams-provider-facebook/pom.xml           |   2 +-
 .../page/FacebookPageDataCollector.java         |  91 +++++-----
 .../provider/page/FacebookPageProvider.java     | 165 +++++++++++--------
 .../FacebookPageProviderConfiguration.json      |  20 +++
 .../src/main/resources/reference.conf           | 119 ++++++++++++-
 .../facebook/test/data/FacebookPostSerDeIT.java |  90 +++++-----
 6 files changed, 328 insertions(+), 159 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml
index 7db5fde..3d21859 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -87,7 +87,7 @@
         <dependency>
             <groupId>org.facebook4j</groupId>
             <artifactId>facebook4j-core</artifactId>
-            <version>2.4.7</version>
+            <version>2.4.8</version>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index b78422b..ca68a7e 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -15,69 +15,80 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider.page;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import facebook4j.*;
-import facebook4j.json.DataObjectFactory;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.FacebookPageProviderConfiguration;
 import org.apache.streams.facebook.IdConfig;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.BlockingQueue;
 
+import facebook4j.FacebookException;
+import facebook4j.Page;
+import facebook4j.Reading;
+import facebook4j.json.DataObjectFactory;
+
 /**
  * Collects the page data from public Facebook pages
  */
 public class FacebookPageDataCollector extends FacebookDataCollector {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageDataCollector.class);
-    private static final int MAX_ATTEMPTS = 5;
-    private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageDataCollector.class);
+  private static final int MAX_ATTEMPTS = 5;
+  private static final ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookConfiguration configuration) {
-        super(configuration, queue);
-    }
+  private String fields;
 
-    @Override
-    protected void getData(IdConfig id) throws Exception {
-        Page responsePage = getPage(id.getId());
-        backOff.reset();
+  public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
+    super(configuration, queue);
+    fields = Joiner.on(',').join(configuration.getFields());
+  }
 
-        if(responsePage != null) {
-            super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(responsePage), org.apache.streams.facebook.Page.class), responsePage.getId());
-        }
+  @Override
+  protected void getData(IdConfig id) throws Exception {
+    Page responsePage = getPage(id.getId());
+    backOff.reset();
+
+    if (responsePage != null) {
+      super.outputData(MAPPER.readValue(DataObjectFactory.getRawJSON(responsePage), org.apache.streams.facebook.Page.class), responsePage.getId());
     }
+  }
+
+  protected Page getPage(String pageId) throws Exception {
+    int attempt = 0;
+    while (attempt < MAX_ATTEMPTS) {
+      ++attempt;
+      try {
+        Page page = getNextFacebookClient().getPage(pageId, new Reading().fields(fields));
+        return page;
+      } catch (FacebookException fe) {
+        LOGGER.error("Facebook returned an exception : {}", fe);
+        LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
 
-    protected Page getPage(String pageId) throws Exception {
-        int attempt = 0;
-        while(attempt < MAX_ATTEMPTS) {
-            ++attempt;
-            try {
-                Page page = getNextFacebookClient().getPage(pageId);
-                return page;
-            } catch (FacebookException fe) {
-                LOGGER.error("Facebook returned an exception : {}", fe);
-                LOGGER.error("Facebook returned an exception while trying to get feed for page, {} : {}", pageId, fe.getMessage());
-
-                int errorCode = fe.getErrorCode();
-
-                //Some sort of rate limiting
-                if(errorCode == 17 || errorCode == 4 || errorCode == 341) {
-                    super.backOff.backOff();
-                }
-            }
+        int errorCode = fe.getErrorCode();
+
+        //Some sort of rate limiting
+        if (errorCode == 17 || errorCode == 4 || errorCode == 341) {
+          super.backOff.backOff();
         }
-        throw new Exception("Failed to get data from facebook after "+MAX_ATTEMPTS);
+      }
     }
+    throw new Exception("Failed to get data from facebook after " + MAX_ATTEMPTS);
+  }
 
-    @VisibleForTesting
-    protected BlockingQueue<StreamsDatum> getQueue() {
-        return super.getQueue();
-    }
+  @VisibleForTesting
+  protected BlockingQueue<StreamsDatum> getQueue() {
+    return super.getQueue();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index dea17e1..cd74927 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -15,24 +15,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.facebook.provider.page;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.facebook.FacebookConfiguration;
+import org.apache.streams.facebook.FacebookPageProviderConfiguration;
 import org.apache.streams.facebook.provider.FacebookDataCollector;
 import org.apache.streams.facebook.provider.FacebookProvider;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,77 +53,91 @@ import java.util.concurrent.TimeUnit;
  * Streams Provider which collects Page Profiles in the ID List contained in the
  * FacebookConfiguration object
  *
- * To use from command line:
+ * <p/>
+ * To use from command line,
  *
- *  Supply (at least) the following required configuration in application.conf:
+ * <p/>
+ * Launch using:
  *
- *  facebook.oauth.appId
- *  facebook.oauth.appSecret
- *  facebook.oauth.userAccessToken
+ * <p/>
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.facebook.provider.page.FacebookPageProvider -Dexec.args="application.conf pages.json"
  *
- *  Launch using:
+ * <p/>
+ * Supply (at least) the following required configuration in application.conf:
  *
- *  mvn exec:java -Dexec.mainClass=org.apache.streams.facebook.provider.page.FacebookPageProvider -Dexec.args="application.conf pages.json"
-
+ * <p>
+ *   facebook.oauth.appId
+ *   facebook.oauth.appSecret
+ *   facebook.oauth.userAccessToken
+ * </p>
  */
 public class FacebookPageProvider extends FacebookProvider {
 
-    public static final String STREAMS_ID = "FacebookPageProvider";
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
-
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    public FacebookPageProvider(FacebookConfiguration facebookConfiguration) {
-        super(facebookConfiguration);
-    }
-
-    @VisibleForTesting
-    BlockingQueue<StreamsDatum> getQueue() {
-        return super.datums;
-    }
-
-    @Override
-    protected FacebookDataCollector getDataCollector() {
-        return new FacebookPageDataCollector(super.datums, super.configuration);
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        Preconditions.checkArgument(args.length >= 2);
-
-        String configfile = args[0];
-        String outfile = args[1];
-
-        Config reference = ConfigFactory.load();
-        File conf_file = new File(configfile);
-        assert(conf_file.exists());
-        Config conf = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
-
-        Config typesafe  = conf.withFallback(reference).resolve();
-
-        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
-        FacebookConfiguration config = new ComponentConfigurator<>(FacebookConfiguration.class).detectConfiguration(typesafe, "facebook");
-        FacebookPageProvider provider = new FacebookPageProvider(config);
-
-        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
-        provider.prepare(config);
-        provider.startStream();
-        do {
-            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
-            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
-            while(iterator.hasNext()) {
-                StreamsDatum datum = iterator.next();
-                String json;
-                try {
-                    json = MAPPER.writeValueAsString(datum.getDocument());
-                    outStream.println(json);
-                } catch (JsonProcessingException e) {
-                    System.err.println(e.getMessage());
-                }
-            }
-        } while( provider.isRunning());
-        provider.cleanUp();
-        outStream.flush();
+  public static final String STREAMS_ID = "FacebookPageProvider";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPageProvider.class);
+
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  private FacebookPageProviderConfiguration configuration;
+
+  public FacebookPageProvider(FacebookPageProviderConfiguration facebookConfiguration) {
+    super(facebookConfiguration);
+    configuration = facebookConfiguration;
+  }
+
+  @VisibleForTesting
+  BlockingQueue<StreamsDatum> getQueue() {
+    return super.datums;
+  }
+
+  @Override
+  protected FacebookDataCollector getDataCollector() {
+    return new FacebookPageDataCollector(super.datums, configuration);
+  }
+
+  /**
+   * Run FacebookPageProvider from command line.
+   * @param args configfile outfile
+   * @throws Exception Exception
+   */
+  public static void main(String[] args) throws Exception {
+
+    Preconditions.checkArgument(args.length >= 2);
+
+    String configfile = args[0];
+    String outfile = args[1];
+
+    Config reference = ConfigFactory.load();
+    File confFile = new File(configfile);
+    assert (confFile.exists());
+    Config conf = ConfigFactory.parseFileAnySyntax(confFile, ConfigParseOptions.defaults().setAllowMissing(false));
+
+    Config typesafe  = conf.withFallback(reference).resolve();
+
+    StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+    FacebookPageProviderConfiguration config = new ComponentConfigurator<>(FacebookPageProviderConfiguration.class).detectConfiguration(typesafe, "facebook");
+    FacebookPageProvider provider = new FacebookPageProvider(config);
+
+    PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+    provider.prepare(config);
+    provider.startStream();
+    do {
+      Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+      Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+      while (iterator.hasNext()) {
+        StreamsDatum datum = iterator.next();
+        String json;
+        try {
+          json = MAPPER.writeValueAsString(datum.getDocument());
+          outStream.println(json);
+        } catch (JsonProcessingException ex) {
+          System.err.println(ex.getMessage());
+        }
+      }
     }
+    while ( provider.isRunning());
+    provider.cleanUp();
+    outStream.flush();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
new file mode 100644
index 0000000..8450cff
--- /dev/null
+++ b/streams-contrib/streams-provider-facebook/src/main/jsonschema/org/apache/streams/facebook/FacebookPageProviderConfiguration.json
@@ -0,0 +1,20 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.facebook.FacebookPageProviderConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "extends": {"$ref":"FacebookConfiguration.json"},
+    "properties": {
+        "fields": {
+            "type": "array",
+            "description": "A list of user IDs, indicating users of interest",
+            "items": {
+                "type": "string"
+            }
+        }
+     }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
index d57da0b..eda1404 100644
--- a/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
+++ b/streams-contrib/streams-provider-facebook/src/main/resources/reference.conf
@@ -14,4 +14,121 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-facebook.version = "v2.8"
\ No newline at end of file
+facebook.version = "v2.8"
+facebook.fields = [
+  "id"
+  "about"
+  "access_token"
+//  "ad_campaign"
+  "affiliation"
+  "app_id"
+  "app_links"
+  "artists_we_like"
+  "attire"
+  "awards"
+  "band_interests"
+  "band_members"
+  "best_page"
+  "bio"
+  "birthday"
+  "booking_agent"
+  "built"
+  "can_checkin"
+  "can_post"
+  "category"
+  "category_list"
+  "checkins"
+  "company_overview"
+  "contact_address"
+  "context"
+  "country_page_likes"
+  "cover"
+  "culinary_team"
+  "current_location"
+  "description"
+  "description_html"
+  "directed_by"
+  "display_subtext"
+  "displayed_message_response_time"
+  "emails"
+  "engagement"
+  "fan_count"
+  "featured_video"
+  "features"
+  "food_styles"
+  "founded"
+  "general_info"
+  "general_manager"
+  "genre"
+  "global_brand_page_name"
+  "global_brand_root_id"
+  "has_added_app"
+  "hometown"
+  "hours"
+  "impressum"
+  "influences"
+//  "instant_articles_review_status"
+  "is_always_open"
+  "is_community_page"
+  "is_permanently_closed"
+  "is_published"
+  "is_unclaimed"
+  "is_verified"
+  "is_webhooks_subscribed"
+  "leadgen_tos_accepted"
+  "link"
+  "location"
+  "members"
+  "mission"
+  "mpg"
+  "name"
+  "name_with_location_descriptor"
+  "network"
+  "new_like_count"
+  "offer_eligible"
+  "overall_star_rating"
+  "parent_page"
+  "parking"
+  "payment_options"
+  "personal_info"
+  "personal_interests"
+  "pharma_safety_info"
+  "phone"
+  "place_type"
+  "plot_outline"
+//  "preferred_audience"
+  "press_contact"
+  "price_range"
+  "produced_by"
+  "products"
+//  "promotion_eligible"
+//  "promotion_ineligible_reason"
+  "public_transit"
+  "publisher_space"
+  "rating_count"
+//  "recipient"
+  "record_label"
+  "release_date"
+  "restaurant_services"
+  "restaurant_specialties"
+  "schedule"
+  "screenplay_by"
+  "season"
+  "single_line_address"
+  "starring"
+//  "start_info"
+  "store_location_descriptor"
+  "store_number"
+  "studio"
+//  "supports_instant_articles"
+  "talking_about_count"
+  "unread_message_count"
+  "unread_notif_count"
+  "unseen_message_count"
+  "username"
+  "verification_status"
+  "voip_info"
+  "website"
+  "were_here_count"
+  "written_by"
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/1799fbfc/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
index f877d56..1c9a620 100644
--- a/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
+++ b/streams-contrib/streams-provider-facebook/src/test/java/org/apache/streams/facebook/test/data/FacebookPostSerDeIT.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.facebook.test.data;
 
+import org.apache.streams.facebook.Post;
+import org.apache.streams.facebook.serializer.FacebookActivityUtil;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Joiner;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.facebook.serializer.FacebookActivityUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -38,60 +40,60 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 /**
- * Tests serialization of Facebook Post inputs
+ * Tests serialization of Facebook Post inputs.
  */
 public class FacebookPostSerDeIT {
 
-    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookPostSerDeIT.class);
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static final Logger LOGGER = LoggerFactory.getLogger(FacebookPostSerDeIT.class);
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    @Test
-    public void Tests()
-    {
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+  @Test
+  public void Tests()
+  {
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.TRUE);
+    mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+    mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
 
-        InputStream is = FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
-        Joiner joiner = Joiner.on(" ").skipNulls();
-        is = new BoundedInputStream(is, 10000);
-        String json;
+    InputStream is = FacebookPostSerDeIT.class.getResourceAsStream("/testpost.json");
+    Joiner joiner = Joiner.on(" ").skipNulls();
+    is = new BoundedInputStream(is, 10000);
+    String json;
 
-        try {
-            json = joiner.join(IOUtils.readLines(is));
-            LOGGER.debug(json);
+    try {
+      json = joiner.join(IOUtils.readLines(is));
+      LOGGER.debug(json);
 
-            Post ser = mapper.readValue(json, Post.class);
+      Post ser = mapper.readValue(json, Post.class);
 
-            String de = mapper.writeValueAsString(ser);
+      String de = mapper.writeValueAsString(ser);
 
-            LOGGER.debug(de);
+      LOGGER.debug(de);
 
-            Post serde = mapper.readValue(de, Post.class);
+      Post serde = mapper.readValue(de, Post.class);
 
-            Assert.assertEquals(ser, serde);
+      assertEquals(ser, serde);
 
-            LOGGER.debug(mapper.writeValueAsString(serde));
+      LOGGER.debug(mapper.writeValueAsString(serde));
 
-            Activity activity = new Activity();
-            FacebookActivityUtil.updateActivity(ser, activity);
+      Activity activity = new Activity();
+      FacebookActivityUtil.updateActivity(ser, activity);
 
-            assertNotNull(activity);
-            assertNotNull(activity.getActor().getId());
-            assertNotNull(activity.getActor().getDisplayName());
-            assertNotNull(activity.getId());
-            assert(activity.getVerb().equals("post"));
-            assertNotNull(activity.getObject());
-            assertNotNull(activity.getUpdated());
-            assertNotNull(activity.getPublished());
-            assertEquals(activity.getProvider().getId(), "id:providers:facebook");
-            assertEquals(activity.getProvider().getDisplayName(), "Facebook");
-            assertEquals(activity.getLinks().size(), 1);
-            assertNotNull(activity.getAdditionalProperties().get("facebook"));
+      assertNotNull(activity);
+      assertNotNull(activity.getActor().getId());
+      assertNotNull(activity.getActor().getDisplayName());
+      assertNotNull(activity.getId());
+      assert(activity.getVerb().equals("post"));
+      assertNotNull(activity.getObject());
+      assertNotNull(activity.getUpdated());
+      assertNotNull(activity.getPublished());
+      assertEquals(activity.getProvider().getId(), "id:providers:facebook");
+      assertEquals(activity.getProvider().getDisplayName(), "Facebook");
+      assertEquals(activity.getLinks().size(), 1);
+      assertNotNull(activity.getAdditionalProperties().get("facebook"));
 
-        } catch( Exception e ) {
-            LOGGER.error("Exception: ", e);
-            Assert.fail();
-        }
+    } catch( Exception e ) {
+      LOGGER.error("Exception: ", e);
+      Assert.fail();
     }
+  }
 }


[2/2] incubator-streams git commit: STREAMS-456: PR feedback

Posted by sb...@apache.org.
STREAMS-456: PR feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/f597411a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/f597411a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/f597411a

Branch: refs/heads/master
Commit: f597411a01458a32b3c4e9fad8311de3180b4b41
Parents: 1799fbf
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Nov 16 02:04:04 2016 +0100
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Nov 16 02:04:04 2016 +0100

----------------------------------------------------------------------
 .../facebook/provider/page/FacebookPageDataCollector.java        | 4 ++--
 .../streams/facebook/provider/page/FacebookPageProvider.java     | 1 -
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f597411a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
index ca68a7e..0e88dd4 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageDataCollector.java
@@ -27,8 +27,8 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 
+import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +52,7 @@ public class FacebookPageDataCollector extends FacebookDataCollector {
 
   public FacebookPageDataCollector(BlockingQueue<StreamsDatum> queue, FacebookPageProviderConfiguration configuration) {
     super(configuration, queue);
-    fields = Joiner.on(',').join(configuration.getFields());
+    fields = StringUtils.join(configuration.getFields(), ',');
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/f597411a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
index cd74927..d11a486 100644
--- a/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
+++ b/streams-contrib/streams-provider-facebook/src/main/java/org/apache/streams/facebook/provider/page/FacebookPageProvider.java
@@ -31,7 +31,6 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.typesafe.config.Config;