You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/08/12 00:04:19 UTC

[7/7] git commit: updated packages

updated packages


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc432af2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc432af2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc432af2

Branch: refs/heads/STREAMS-46
Commit: dc432af2c70e9ac443af3cbe57a7772fdd1e146d
Parents: e0cb5ec
Author: sblackmon <sb...@apache.org>
Authored: Mon Aug 11 17:03:50 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Mon Aug 11 17:03:50 2014 -0500

----------------------------------------------------------------------
 .../streams-provider-facebook/pom.xml           |   8 +-
 .../api/FacebookPostActivitySerializer.java     | 300 -----------------
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 --
 .../processor/FacebookTypeConverter.java        | 194 -----------
 .../provider/FacebookFriendFeedProvider.java    | 285 -----------------
 .../provider/FacebookFriendUpdatesProvider.java | 285 -----------------
 .../FacebookUserInformationProvider.java        | 298 -----------------
 .../provider/FacebookUserstreamProvider.java    | 319 ------------------
 .../api/FacebookPostActivitySerializer.java     | 286 +++++++++++++++++
 ...FacebookPublicFeedXmlActivitySerializer.java |  29 ++
 .../processor/FacebookTypeConverter.java        | 194 +++++++++++
 .../provider/FacebookFriendFeedProvider.java    | 282 ++++++++++++++++
 .../provider/FacebookFriendUpdatesProvider.java | 286 +++++++++++++++++
 .../FacebookUserInformationProvider.java        | 299 +++++++++++++++++
 .../provider/FacebookUserstreamProvider.java    | 320 +++++++++++++++++++
 .../com/facebook/FacebookConfiguration.json     |  49 ---
 .../FacebookUserInformationConfiguration.json   |  23 --
 .../FacebookUserstreamConfiguration.json        |  22 --
 .../jsonschema/com/facebook/graph/Post.json     | 203 ------------
 .../streams/facebook/FacebookConfiguration.json |  49 +++
 .../FacebookUserInformationConfiguration.json   |  23 ++
 .../FacebookUserstreamConfiguration.json        |  22 ++
 .../org/apache/streams/facebook/graph/Post.json | 203 ++++++++++++
 .../test/FacebookActivitySerDeTest.java         |   4 +-
 .../gnip-edc-facebook/pom.xml                   |   2 +-
 .../src/test/resources/FlickrEDC.xml            | 128 ++++----
 .../src/test/resources/RedditEDC.xml            | 200 ++++++------
 .../src/test/resources/RedditEDCFlattened.xml   | 200 ++++++------
 .../src/test/resources/redditTest.xml           |   2 +-
 29 files changed, 2264 insertions(+), 2280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/pom.xml b/streams-contrib/streams-provider-facebook/pom.xml
index 34d53a9..d688a00 100644
--- a/streams-contrib/streams-provider-facebook/pom.xml
+++ b/streams-contrib/streams-provider-facebook/pom.xml
@@ -101,10 +101,10 @@
                     <addCompileSourceRoot>true</addCompileSourceRoot>
                     <generateBuilders>true</generateBuilders>
                     <sourcePaths>
-                        <sourcePath>src/main/jsonschema/com/facebook/FacebookConfiguration.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/com/facebook/FacebookUserInformationConfiguration.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/com/facebook/FacebookUserstreamConfiguration.json</sourcePath>
-                        <sourcePath>src/main/jsonschema/com/facebook/graph/Post.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookUserInformationConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/facebook/FacebookUserstreamConfiguration.json</sourcePath>
+                        <sourcePath>src/main/jsonschema/org/apache/streams/facebook/graph/Post.json</sourcePath>
                     </sourcePaths>
                     <outputDirectory>target/generated-sources/jsonschema2pojo</outputDirectory>
                     <targetPackage>com.facebook</targetPackage>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
deleted file mode 100644
index 71bc5c9..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/api/FacebookPostActivitySerializer.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.api;
-
-
-import com.fasterxml.jackson.core.JsonGenerationException;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.databind.ser.std.StdSerializer;
-import com.fasterxml.jackson.datatype.joda.JodaModule;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.ActivitySerializer;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.pojo.json.*;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.List;
-
-import static org.apache.streams.data.util.ActivityUtil.*;
-import static org.apache.streams.data.util.JsonUtil.jsonToJsonNode;
-
-/**
- * Serializes activity posts
- *   sblackmon: This class needs a rewrite
- */
-public class FacebookPostActivitySerializer implements ActivitySerializer<org.apache.streams.facebook.Post> {
-
-    public static final DateTimeFormatter FACEBOOK_FORMAT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ssZ");
-    public static final DateTimeFormatter ACTIVITY_FORMAT = ISODateTimeFormat.basicDateTime();
-
-    public static final String PROVIDER_NAME = "facebook";
-
-    public static ObjectMapper mapper;
-    static {
-        mapper = StreamsJacksonMapper.getInstance();
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "facebook_post_json_v1";
-    }
-
-    @Override
-    public Post serialize(Activity deserialized) throws ActivitySerializerException {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
-
-    @Override
-    public Activity deserialize(Post post) throws ActivitySerializerException {
-        Activity activity = new Activity();
-        activity.setPublished(post.getCreatedTime());
-        activity.setUpdated(post.getUpdatedTime());
-        addActor(activity, mapper.convertValue(post.getFrom(), ObjectNode.class));
-        setProvider(activity);
-        setObjectType(post.getType(), activity);
-        parseObject(activity, mapper.convertValue(post, ObjectNode.class));
-        fixObjectId(activity);
-        fixContentFromSummary(activity);
-        return activity;
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<Post> serializedList) {
-        throw new NotImplementedException("Not currently supported by this deserializer");
-    }
-
-    private void fixContentFromSummary(Activity activity) {
-        //we MUST have a content field set, so choose the best option
-        if(activity.getContent() == null) {
-            activity.setContent(activity.getAdditionalProperties().containsKey("summary") ?
-                    (String) activity.getAdditionalProperties().get("summary") :
-                    activity.getObject().getSummary());
-        }
-    }
-
-    private void fixObjectId(Activity activity) {
-        //An artifact of schema generation, the default value is {link}
-        if(activity.getObject().getId().equals("{link}")) {
-            activity.getObject().setId(null);
-        }
-    }
-
-    private void setObjectType(String type, Activity activity) {
-        ActivityObject object = new ActivityObject();
-        activity.setObject(object);
-        object.setObjectType(type);
-    }
-
-    private void setProvider(Activity activity) {
-        Provider provider = new Provider();
-        provider.setId(getProviderId(PROVIDER_NAME));
-        provider.setDisplayName(PROVIDER_NAME);
-        activity.setProvider(provider);
-    }
-
-    private String getObjectType(JsonNode node) {
-        Iterator<Map.Entry<String, JsonNode>> fields = node.fields();
-        ensureMoreFields(fields);
-        Map.Entry<String, JsonNode> field = fields.next();
-        //ensureNoMoreFields(fields);
-        return node.asText();
-    }
-
-    private void parseObject(Activity activity, JsonNode jsonNode) throws ActivitySerializerException {
-        for(Iterator<Map.Entry<String, JsonNode>> fields = jsonNode.fields(); fields.hasNext();) {
-            Map.Entry<String, JsonNode> field = fields.next();
-            String key = field.getKey();
-            JsonNode value = field.getValue();
-            mapField(activity, key, value);
-        }
-    }
-
-    private void mapField(Activity activity, String name, JsonNode value) throws ActivitySerializerException {
-        if("application".equals(name)) {
-            addGenerator(activity, value);
-        } else if ("caption".equals(name)) {
-            addSummary(activity, value);
-        } else if ("comments".equals(name)) {
-            addAttachments(activity, value);
-        } else if ("description".equals(name)) {
-            addObjectSummary(activity, value);
-        } else if ("from".equals(name)) {
-            addActor(activity, value);
-        } else if ("icon".equals(name)) {
-            addIcon(activity, value);
-        } else if ("id".equals(name)) {
-            addId(activity, value);
-        } else if ("is_hidden".equals(name)) {
-            addObjectHiddenExtension(activity, value);
-        } else if ("like_count".equals(name)) {
-            addLikeExtension(activity, value);
-        } else if ("link".equals(name)) {
-            addObjectLink(activity, value);
-        } else if ("message".equals(name)) {
-            activity.setContent(value.asText());
-        } else if ("name".equals(name)) {
-            addObjectName(activity, value);
-        } else if ("object_id".equals(name)) {
-            addObjectId(activity, value);
-        } else if ("picture".equals(name)) {
-            addObjectImage(activity, value);
-        } else if ("place".equals(name)) {
-            addLocationExtension(activity, value);
-        } else if ("shares".equals(name)) {
-            addRebroadcastExtension(activity, value);
-        } else if ("source".equals(name)) {
-            addObjectLink(activity, value);
-        } else if ("story".equals(name)) {
-            addTitle(activity, value);
-        }
-    }
-
-    private void addSummary(Activity activity, JsonNode value) {
-        activity.setAdditionalProperty("summary", value.asText());
-    }
-
-    private void addTitle(Activity activity, JsonNode value) {
-        activity.setTitle(value.asText());
-    }
-
-    private void addLikeExtension(Activity activity, JsonNode value) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        extensions.put(LIKES_EXTENSION, value.asInt());
-    }
-
-    private void addLocationExtension(Activity activity, JsonNode value) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        if(value.has("location")) {
-            Map<String, Object> location = new HashMap<String, Object>();
-            JsonNode fbLocation = value.get("location");
-            if(fbLocation.has("country")) {
-                location.put(LOCATION_EXTENSION_COUNTRY, fbLocation.get("country"));
-            }
-            if(fbLocation.has("latitude") && fbLocation.has("longitude")) {
-                location.put(LOCATION_EXTENSION_COORDINATES, String.format("%s,%s", fbLocation.get("longitude"), fbLocation.get("latitude")));
-            }
-            extensions.put(LOCATION_EXTENSION, location);
-        }
-    }
-
-    private void addObjectImage(Activity activity, JsonNode value) {
-        Image image = new Image();
-        image.setUrl(value.asText());
-        activity.getObject().setImage(image);
-    }
-
-    private void addObjectId(Activity activity, JsonNode value) {
-        activity.getObject().setId(getObjectId("facebook", activity.getObject().getObjectType(), value.asText()));
-    }
-
-    private void addObjectName(Activity activity, JsonNode value) {
-        activity.getObject().setDisplayName(value.asText());
-    }
-
-    private void addId(Activity activity, JsonNode value) {
-        activity.setId(getActivityId(PROVIDER_NAME, value.asText()));
-    }
-
-    private void addObjectLink(Activity activity, JsonNode value) {
-        activity.getObject().setUrl(value.asText());
-    }
-
-    private void addRebroadcastExtension(Activity activity, JsonNode value) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        if(value.has("count")) {
-            extensions.put(REBROADCAST_EXTENSION, value.get("count").asInt());
-        }
-    }
-
-    private void addObjectHiddenExtension(Activity activity, JsonNode value) {
-        Map<String, Object> extensions = ensureExtensions(activity);
-        extensions.put("hidden", value.asBoolean());
-    }
-
-    private void addIcon(Activity activity, JsonNode value) {
-        Icon icon = new Icon();
-        //Apparently the Icon didn't map from the schema very well
-        icon.setAdditionalProperty("url", value.asText());
-        activity.setIcon(icon);
-    }
-
-    private void addActor(Activity activity, JsonNode value) {
-        Actor actor = new Actor();
-        if(value.has("name")) {
-            actor.setDisplayName(value.get("name").asText());
-        }
-        if(value.has("id")) {
-            actor.setId(getPersonId(PROVIDER_NAME, value.get("id").asText()));
-        }
-        activity.setActor(actor);
-    }
-
-    private void addObjectSummary(Activity activity, JsonNode value) {
-        activity.getObject().setSummary(value.asText());
-    }
-
-    private void addGenerator(Activity activity, JsonNode value) {
-        Generator generator = new Generator();
-        if(value.has("id")) {
-            generator.setId(getObjectId(PROVIDER_NAME, "generator", value.get("id").asText()));
-        }
-        if(value.has("name")) {
-            generator.setDisplayName(value.get("name").asText());
-        }
-        if(value.has("namespace")) {
-            generator.setSummary(value.get("namespace").asText());
-        }
-        activity.setGenerator(generator);
-    }
-
-    private void addAttachments(Activity activity, JsonNode value) {
-        //No direct mapping at this time
-    }
-
-    private static void ensureMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) {
-        if(!fields.hasNext()) {
-            throw new IllegalStateException("Facebook activity must have one and only one root element");
-        }
-    }
-    private static void ensureNoMoreFields(Iterator<Map.Entry<String, JsonNode>> fields) {
-        if(fields.hasNext()) {
-            throw new IllegalStateException("Facebook activity must have one and only one root element");
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
deleted file mode 100644
index f126d88..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/feed/FacebookPublicFeedXmlActivitySerializer.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.feed;
-
-/**
- * Created with IntelliJ IDEA.
- * User: sblackmon
- * Date: 10/2/13
- * Time: 6:32 PM
- * To change this template use File | Settings | File Templates.
- */
-public class FacebookPublicFeedXmlActivitySerializer {
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
deleted file mode 100644
index 6ddb673..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/processor/FacebookTypeConverter.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.processor;
-
-import com.facebook.api.FacebookPostActivitySerializer;
-import com.fasterxml.jackson.core.JsonParseException;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.exceptions.ActivitySerializerException;
-import org.apache.streams.facebook.Post;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class FacebookTypeConverter implements StreamsProcessor {
-
-    public final static String STREAMS_ID = "TwitterTypeConverter";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(FacebookTypeConverter.class);
-
-    private ObjectMapper mapper;
-
-    private Queue<StreamsDatum> inQueue;
-    private Queue<StreamsDatum> outQueue;
-
-    private Class inClass;
-    private Class outClass;
-
-    private FacebookPostActivitySerializer facebookPostActivitySerializer;
-
-    private int count = 0;
-
-    public final static String TERMINATE = new String("TERMINATE");
-
-    public FacebookTypeConverter(Class inClass, Class outClass) {
-        this.inClass = inClass;
-        this.outClass = outClass;
-    }
-
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
-
-    public void setProcessorInputQueue(Queue<StreamsDatum> inputQueue) {
-        inQueue = inputQueue;
-    }
-
-    public Object convert(ObjectNode event, Class inClass, Class outClass) throws ActivitySerializerException, JsonProcessingException {
-
-        Object result = null;
-
-        if( outClass.equals( Activity.class )) {
-            LOGGER.debug("ACTIVITY");
-            result = facebookPostActivitySerializer.deserialize(mapper.convertValue(event, Post.class));
-        } else if( outClass.equals( Post.class )) {
-            LOGGER.debug("POST");
-            result = mapper.convertValue(event, Post.class);
-        } else if( outClass.equals( ObjectNode.class )) {
-            LOGGER.debug("OBJECTNODE");
-            result = mapper.convertValue(event, ObjectNode.class);
-        }
-
-        // no supported conversion were applied
-        if( result != null ) {
-            count ++;
-            return result;
-        }
-
-        LOGGER.debug("CONVERT FAILED");
-
-        return null;
-
-    }
-
-    public boolean validate(Object document, Class klass) {
-
-        // TODO
-        return true;
-    }
-
-    public boolean isValidJSON(final String json) {
-        boolean valid = false;
-        try {
-            final JsonParser parser = new ObjectMapper().getJsonFactory()
-                    .createJsonParser(json);
-            while (parser.nextToken() != null) {
-            }
-            valid = true;
-        } catch (JsonParseException jpe) {
-            LOGGER.warn("validate: {}", jpe);
-        } catch (IOException ioe) {
-            LOGGER.warn("validate: {}", ioe);
-        }
-
-        return valid;
-    }
-
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        StreamsDatum result = null;
-
-        try {
-
-            Object item = entry.getDocument();
-            ObjectNode node;
-
-            LOGGER.debug("{} processing {}", STREAMS_ID, item.getClass());
-
-            if( item instanceof String ) {
-
-                // if the target is string, just pass-through
-                if( String.class.equals(outClass)) {
-                    result = entry;
-                }
-                else {
-                    // first check for valid json
-                    node = (ObjectNode)mapper.readTree((String)item);
-
-                    // since data is coming from outside provider, we don't know what type the events are
-                    // for now we'll assume post
-
-                    Object out = convert(node, Post.class, outClass);
-
-                    if( out != null && validate(out, outClass))
-                        result = new StreamsDatum(out);
-                }
-
-            } else if( item instanceof ObjectNode || item instanceof Post) {
-
-                // first check for valid json
-                node = (ObjectNode)mapper.valueToTree(item);
-
-                // since data is coming from outside provider, we don't know what type the events are
-                // for now we'll assume post
-
-                Object out = convert(node, Post.class, outClass);
-
-                if( out != null && validate(out, outClass))
-                    result = new StreamsDatum(out);
-
-            }
-
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-
-        if( result != null )
-            return Lists.newArrayList(result);
-        else
-            return Lists.newArrayList();
-    }
-
-    @Override
-    public void prepare(Object o) {
-        mapper = new StreamsJacksonMapper();
-        facebookPostActivitySerializer = new FacebookPostActivitySerializer();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java
deleted file mode 100644
index a66d213..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendFeedProvider.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.provider;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.facebook.FacebookUserInformationConfiguration;
-import org.apache.streams.facebook.FacebookUserstreamConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class FacebookFriendFeedProvider implements StreamsProvider, Serializable
-{
-
-    public static final String STREAMS_ID = "FacebookFriendFeedProvider";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendFeedProvider.class);
-
-    private static final ObjectMapper mapper = new StreamsJacksonMapper();
-
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserstreamConfiguration configuration;
-
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
-
-    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
-
-    protected Iterator<String[]> idsBatches;
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    public FacebookFriendFeedProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
-
-    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
-
-    public FacebookFriendFeedProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
-
-    public FacebookFriendFeedProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
-    }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
-
-    @Override
-    public void startStream() {
-        shutdownAndAwaitTermination(executor);
-        running.set(true);
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        synchronized (FacebookUserstreamProvider.class) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
-        }
-
-        return current;
-
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
-
-        Facebook client = getFacebookClient();
-
-        try {
-            ResponseList<Friend> friendResponseList = client.friends().getFriends();
-            Paging<Friend> friendPaging;
-            do {
-
-                for( Friend friend : friendResponseList ) {
-
-                    executor.submit(new FacebookFriendFeedTask(this, friend.getId()));
-                }
-                friendPaging = friendResponseList.getPaging();
-                friendResponseList = client.fetchNext(friendPaging);
-            } while( friendPaging != null &&
-                    friendResponseList != null );
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
-
-    }
-
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(configuration.getOauth().getAppId())
-            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFriendFeedTask implements Runnable {
-
-        FacebookFriendFeedProvider provider;
-        Facebook client;
-        String id;
-
-        public FacebookFriendFeedTask(FacebookFriendFeedProvider provider, String id) {
-            this.provider = provider;
-            this.id = id;
-        }
-
-        @Override
-        public void run() {
-            client = provider.getFacebookClient();
-                try {
-                    ResponseList<Post> postResponseList = client.getFeed(id);
-                    Paging<Post> postPaging;
-                    do {
-
-                        for (Post item : postResponseList) {
-                            String json = DataObjectFactory.getRawJSON(item);
-                            org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                            try {
-                                lock.readLock().lock();
-                                ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                                countersCurrent.incrementAttempt();
-                            } finally {
-                                lock.readLock().unlock();
-                            }
-                        }
-                        postPaging = postResponseList.getPaging();
-                        postResponseList = client.fetchNext(postPaging);
-                    } while( postPaging != null &&
-                            postResponseList != null );
-
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java
deleted file mode 100644
index a111823..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookFriendUpdatesProvider.java
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.provider;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.facebook.FacebookUserInformationConfiguration;
-import org.apache.streams.facebook.FacebookUserstreamConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class FacebookFriendUpdatesProvider implements StreamsProvider, Serializable
-{
-
-    public static final String STREAMS_ID = "FacebookFriendPostsProvider";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookFriendUpdatesProvider.class);
-
-    private static final ObjectMapper mapper = new StreamsJacksonMapper();
-
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserstreamConfiguration configuration;
-
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    public FacebookUserstreamConfiguration getConfig()              { return configuration; }
-
-    public void setConfig(FacebookUserstreamConfiguration config)   { this.configuration = config; }
-
-    protected Iterator<String[]> idsBatches;
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    public FacebookFriendUpdatesProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
-
-    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
-
-    public FacebookFriendUpdatesProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration configuration;
-        try {
-            configuration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
-
-    public FacebookFriendUpdatesProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
-    }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
-
-    @Override
-    public void startStream() {
-        running.set(true);
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        Preconditions.checkArgument(idsBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        // return stuff
-
-        LOGGER.info("Finished.  Cleaning up...");
-
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
-
-        LOGGER.info("Exiting");
-
-        return result;
-
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
-
-        Facebook client = getFacebookClient();
-
-        try {
-            ResponseList<Friend> friendResponseList = client.friends().getFriends();
-            Paging<Friend> friendPaging;
-            do {
-
-                for( Friend friend : friendResponseList ) {
-
-                    //client.rawAPI().callPostAPI();
-                    // add a subscription
-                }
-                friendPaging = friendResponseList.getPaging();
-                friendResponseList = client.fetchNext(friendPaging);
-            } while( friendPaging != null &&
-                    friendResponseList != null );
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
-
-    }
-
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(configuration.getOauth().getAppId())
-            .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-            .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFeedPollingTask implements Runnable {
-
-        FacebookUserstreamProvider provider;
-        Facebook client;
-
-        private Set<Post> priorPollResult = Sets.newHashSet();
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
-            provider = facebookUserstreamProvider;
-        }
-
-        @Override
-        public void run() {
-            client = provider.getFacebookClient();
-            while (provider.isRunning()) {
-                try {
-                    ResponseList<Post> postResponseList = client.getHome();
-                    Set<Post> update = Sets.newHashSet(postResponseList);
-                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
-                    Set<Post> entrySet = Sets.difference(update, repeats);
-                    for (Post item : entrySet) {
-                        String json = DataObjectFactory.getRawJSON(item);
-                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                        try {
-                            lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                            countersCurrent.incrementAttempt();
-                        } finally {
-                            lock.readLock().unlock();
-                        }
-                    }
-                    priorPollResult = update;
-                    Thread.sleep(configuration.getPollIntervalMillis());
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
deleted file mode 100644
index 8640f5d..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserInformationProvider.java
+++ /dev/null
@@ -1,298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.provider;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.facebook.FacebookUserInformationConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class FacebookUserInformationProvider implements StreamsProvider, Serializable
-{
-
-    public static final String STREAMS_ID = "FacebookUserInformationProvider";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserInformationProvider.class);
-
-    private static final ObjectMapper mapper = new StreamsJacksonMapper();
-
-    private static final String ALL_PERMISSIONS = "ads_management,ads_read,create_event,create_note,email,export_stream,friends_about_me,friends_actions.books,friends_actions.music,friends_actions.news,friends_actions.video,friends_activities,friends_birthday,friends_education_history,friends_events,friends_games_activity,friends_groups,friends_hometown,friends_interests,friends_likes,friends_location,friends_notes,friends_online_presence,friends_photo_video_tags,friends_photos,friends_questions,friends_relationship_details,friends_relationships,friends_religion_politics,friends_status,friends_subscriptions,friends_videos,friends_website,friends_work_history,manage_friendlists,manage_notifications,manage_pages,photo_upload,publish_actions,publish_stream,read_friendlists,read_insights,read_mailbox,read_page_mailboxes,read_requests,read_stream,rsvp_event,share_item,sms,status_update,user_about_me,user_actions.books,user_actions.music,user_actions.news,user_actions.video,user_activitie
 s,user_birthday,user_education_history,user_events,user_friends,user_games_activity,user_groups,user_hometown,user_interests,user_likes,user_location,user_notes,user_online_presence,user_photo_video_tags,user_photos,user_questions,user_relationship_details,user_relationships,user_religion_politics,user_status,user_subscriptions,user_videos,user_website,user_work_history,video_upload,xmpp_login";
-    private FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-
-    private Class klass;
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    public FacebookUserInformationConfiguration getConfig()              { return facebookUserInformationConfiguration; }
-
-    public void setConfig(FacebookUserInformationConfiguration config)   { this.facebookUserInformationConfiguration = config; }
-
-    protected Iterator<String[]> idsBatches;
-
-    protected ExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    public FacebookUserInformationProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
-
-    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config) {
-        this.facebookUserInformationConfiguration = config;
-    }
-
-    public FacebookUserInformationProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
-
-    public FacebookUserInformationProvider(FacebookUserInformationConfiguration config, Class klass) {
-        this.facebookUserInformationConfiguration = config;
-        this.klass = klass;
-    }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
-
-    @Override
-    public void startStream() {
-        running.set(true);
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        Preconditions.checkArgument(idsBatches.hasNext());
-
-        LOGGER.info("readCurrent");
-
-        Facebook client = getFacebookClient();
-
-        try {
-            User me = client.users().getMe();
-            String json = mapper.writeValueAsString(me);
-            providerQueue.add(
-                new StreamsDatum(json, DateTime.now())
-            );
-        } catch (JsonProcessingException e) {
-            e.printStackTrace();
-        } catch (FacebookException e) {
-            e.printStackTrace();
-        }
-
-        if( idsBatches.hasNext()) {
-            while (idsBatches.hasNext()) {
-                try {
-                    List<User> userList = client.users().getUsers(idsBatches.next());
-                    for (User user : userList) {
-
-                        try {
-                            String json = mapper.writeValueAsString(user);
-                            providerQueue.add(
-                                    new StreamsDatum(json, DateTime.now())
-                            );
-                        } catch (JsonProcessingException e) {
-                            //                        e.printStackTrace();
-                        }
-                    }
-
-                } catch (FacebookException e) {
-                    e.printStackTrace();
-                }
-            }
-        } else {
-            try {
-                ResponseList<Friend> friendResponseList = client.friends().getFriends();
-                Paging<Friend> friendPaging;
-                do {
-
-                    for( Friend friend : friendResponseList ) {
-
-                        String json;
-                        try {
-                            json = mapper.writeValueAsString(friend);
-                            providerQueue.add(
-                                    new StreamsDatum(json)
-                            );
-                        } catch (JsonProcessingException e) {
-//                        e.printStackTrace();
-                        }
-                    }
-                    friendPaging = friendResponseList.getPaging();
-                    friendResponseList = client.fetchNext(friendPaging);
-                } while( friendPaging != null &&
-                        friendResponseList != null );
-            } catch (FacebookException e) {
-                e.printStackTrace();
-            }
-
-        }
-
-        LOGGER.info("Finished.  Cleaning up...");
-
-        LOGGER.info("Providing {} docs", providerQueue.size());
-
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
-        running.set(false);
-
-        LOGGER.info("Exiting");
-
-        return result;
-
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet)providerQueue.iterator();
-        return result;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppId());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getOauth().getUserAccessToken());
-        Preconditions.checkNotNull(facebookUserInformationConfiguration.getInfo());
-
-        List<String> ids = new ArrayList<String>();
-        List<String[]> idsBatches = new ArrayList<String[]>();
-
-        for(String s : facebookUserInformationConfiguration.getInfo()) {
-            if(s != null)
-            {
-                ids.add(s);
-
-                if(ids.size() >= 100) {
-                    // add the batch
-                    idsBatches.add(ids.toArray(new String[ids.size()]));
-                    // reset the Ids
-                    ids = new ArrayList<String>();
-                }
-
-            }
-        }
-
-        if(ids.size() > 0)
-            idsBatches.add(ids.toArray(new String[ids.size()]));
-
-        this.idsBatches = idsBatches.iterator();
-    }
-
-    protected Facebook getFacebookClient()
-    {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-            .setOAuthAppId(facebookUserInformationConfiguration.getOauth().getAppId())
-            .setOAuthAppSecret(facebookUserInformationConfiguration.getOauth().getAppSecret())
-            .setOAuthAccessToken(facebookUserInformationConfiguration.getOauth().getUserAccessToken())
-            .setOAuthPermissions(ALL_PERMISSIONS)
-            .setJSONStoreEnabled(true)
-            .setClientVersion("v1.0");
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc432af2/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java b/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
deleted file mode 100644
index b0bf082..0000000
--- a/streams-contrib/streams-provider-facebook/src/main/java/com/facebook/provider/FacebookUserstreamProvider.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.facebook.provider;
-
-import com.facebook.*;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import facebook4j.*;
-import facebook4j.Post;
-import facebook4j.conf.ConfigurationBuilder;
-import facebook4j.json.DataObjectFactory;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.facebook.FacebookUserInformationConfiguration;
-import org.apache.streams.facebook.FacebookUserstreamConfiguration;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.util.ComponentUtils;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import sun.reflect.generics.reflectiveObjects.NotImplementedException;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-public class FacebookUserstreamProvider implements StreamsProvider, Serializable {
-
-    public static final String STREAMS_ID = "FacebookUserstreamProvider";
-    private static final Logger LOGGER = LoggerFactory.getLogger(FacebookUserstreamProvider.class);
-
-    private static final ObjectMapper mapper = new StreamsJacksonMapper();
-
-    private static final String ALL_PERMISSIONS = "read_stream";
-    private FacebookUserstreamConfiguration configuration;
-
-    private Class klass;
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-    protected volatile Queue<StreamsDatum> providerQueue = new LinkedBlockingQueue<StreamsDatum>();
-
-    public FacebookUserstreamConfiguration getConfig() {
-        return configuration;
-    }
-
-    public void setConfig(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
-
-    protected ListeningExecutorService executor;
-
-    protected DateTime start;
-    protected DateTime end;
-
-    protected final AtomicBoolean running = new AtomicBoolean();
-
-    private DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private DatumStatusCounter countersTotal = new DatumStatusCounter();
-
-    protected Facebook client;
-
-    private static ExecutorService newFixedThreadPoolWithQueueSize(int nThreads, int queueSize) {
-        return new ThreadPoolExecutor(nThreads, nThreads,
-                5000L, TimeUnit.MILLISECONDS,
-                new ArrayBlockingQueue<Runnable>(queueSize, true), new ThreadPoolExecutor.CallerRunsPolicy());
-    }
-
-    public FacebookUserstreamProvider() {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-    }
-
-    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config) {
-        this.configuration = config;
-    }
-
-    public FacebookUserstreamProvider(Class klass) {
-        Config config = StreamsConfigurator.config.getConfig("facebook");
-        FacebookUserInformationConfiguration facebookUserInformationConfiguration;
-        try {
-            facebookUserInformationConfiguration = mapper.readValue(config.root().render(ConfigRenderOptions.concise()), FacebookUserInformationConfiguration.class);
-        } catch (IOException e) {
-            e.printStackTrace();
-            return;
-        }
-        this.klass = klass;
-    }
-
-    public FacebookUserstreamProvider(FacebookUserstreamConfiguration config, Class klass) {
-        this.configuration = config;
-        this.klass = klass;
-    }
-
-    public Queue<StreamsDatum> getProviderQueue() {
-        return this.providerQueue;
-    }
-
-    @Override
-    public void startStream() {
-
-        client = getFacebookClient();
-
-        if( configuration.getInfo() != null &&
-            configuration.getInfo().size() > 0 ) {
-            for( String id : configuration.getInfo()) {
-                executor.submit(new FacebookFeedPollingTask(this, id));
-            }
-            running.set(true);
-        } else {
-            try {
-                String id = client.getMe().getId();
-                executor.submit(new FacebookFeedPollingTask(this, id));
-                running.set(true);
-            } catch (FacebookException e) {
-                LOGGER.error(e.getMessage());
-                running.set(false);
-            }
-        }
-    }
-
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        synchronized (FacebookUserstreamProvider.class) {
-            current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(providerQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            providerQueue.clear();
-        }
-
-        return current;
-
-    }
-
-    public StreamsResultSet readNew(BigInteger sequence) {
-        LOGGER.debug("{} readNew", STREAMS_ID);
-        throw new NotImplementedException();
-    }
-
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        LOGGER.debug("{} readRange", STREAMS_ID);
-        this.start = start;
-        this.end = end;
-        readCurrent();
-        StreamsResultSet result = (StreamsResultSet) providerQueue.iterator();
-        return result;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return running.get();
-    }
-
-    void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    System.err.println("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
-        }
-    }
-
-    @Override
-    public void prepare(Object o) {
-
-        executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
-
-        Preconditions.checkNotNull(providerQueue);
-        Preconditions.checkNotNull(this.klass);
-        Preconditions.checkNotNull(configuration.getOauth().getAppId());
-        Preconditions.checkNotNull(configuration.getOauth().getAppSecret());
-        Preconditions.checkNotNull(configuration.getOauth().getUserAccessToken());
-
-        client = getFacebookClient();
-
-        if( configuration.getInfo() != null &&
-            configuration.getInfo().size() > 0 ) {
-
-            List<String> ids = new ArrayList<String>();
-            List<String[]> idsBatches = new ArrayList<String[]>();
-
-            for (String s : configuration.getInfo()) {
-                if (s != null) {
-                    ids.add(s);
-
-                    if (ids.size() >= 100) {
-                        // add the batch
-                        idsBatches.add(ids.toArray(new String[ids.size()]));
-                        // reset the Ids
-                        ids = new ArrayList<String>();
-                    }
-
-                }
-            }
-        }
-    }
-
-    protected Facebook getFacebookClient() {
-        ConfigurationBuilder cb = new ConfigurationBuilder();
-        cb.setDebugEnabled(true)
-                .setOAuthAppId(configuration.getOauth().getAppId())
-                .setOAuthAppSecret(configuration.getOauth().getAppSecret())
-                .setOAuthAccessToken(configuration.getOauth().getUserAccessToken())
-                .setOAuthPermissions(ALL_PERMISSIONS)
-                .setJSONStoreEnabled(true);
-
-        FacebookFactory ff = new FacebookFactory(cb.build());
-        Facebook facebook = ff.getInstance();
-
-        return facebook;
-    }
-
-    @Override
-    public void cleanUp() {
-        shutdownAndAwaitTermination(executor);
-    }
-
-    private class FacebookFeedPollingTask implements Runnable {
-
-        FacebookUserstreamProvider provider;
-        Facebook client;
-        String id;
-
-        private Set<Post> priorPollResult = Sets.newHashSet();
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider) {
-            this.provider = facebookUserstreamProvider;
-        }
-
-        public FacebookFeedPollingTask(FacebookUserstreamProvider facebookUserstreamProvider, String id) {
-            this.provider = facebookUserstreamProvider;
-            this.client = provider.client;
-            this.id = id;
-        }
-        @Override
-        public void run() {
-            while (provider.isRunning()) {
-                ResponseList<Post> postResponseList;
-                try {
-                    postResponseList = client.getFeed(id);
-
-                    Set<Post> update = Sets.newHashSet(postResponseList);
-                    Set<Post> repeats = Sets.intersection(priorPollResult, Sets.newHashSet(update));
-                    Set<Post> entrySet = Sets.difference(update, repeats);
-                    LOGGER.debug(this.id + " response: " + update.size() + " previous: " + repeats.size() + " new: " + entrySet.size());
-                    for (Post item : entrySet) {
-                        String json = DataObjectFactory.getRawJSON(item);
-                        org.apache.streams.facebook.Post post = mapper.readValue(json, org.apache.streams.facebook.Post.class);
-                        try {
-                            lock.readLock().lock();
-                            ComponentUtils.offerUntilSuccess(new StreamsDatum(post), providerQueue);
-                            countersCurrent.incrementAttempt();
-                        } finally {
-                            lock.readLock().unlock();
-                        }
-                    }
-                    priorPollResult = update;
-                } catch (Exception e) {
-                    e.printStackTrace();
-                } finally {
-                    try {
-                        Thread.sleep(configuration.getPollIntervalMillis());
-                    } catch (InterruptedException e) {
-                        Thread.currentThread().interrupt();
-                    }
-                }
-            }
-        }
-    }
-}