You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2017/04/19 19:49:48 UTC

[1/3] incubator-streams git commit: STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

Repository: incubator-streams
Updated Branches:
  refs/heads/master 19caf33ac -> 67497a488


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListRequest.json
new file mode 100644
index 0000000..75bba94
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FriendsListRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/list",
+  "extends": { "$ref": "FollowingListRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListResponse.json
new file mode 100644
index 0000000..ac357aa
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsListResponse.json
@@ -0,0 +1,33 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FriendsListResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/list",
+  "properties": {
+    "users": {
+      "type": "array",
+      "items": {
+        "type": "object",
+        "$ref": "../pojo/User.json"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesLookupRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesLookupRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesLookupRequest.json
new file mode 100644
index 0000000..4306b6b
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesLookupRequest.json
@@ -0,0 +1,35 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.StatusesLookupRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/statuses/lookup",
+  "properties": {
+    "id": {
+      "description": "A comma separated list of Tweet IDs, up to 100 are allowed in a single request.",
+      "required": true,
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "trim_user": {
+      "description": "When set to either true , t or 1 , each Tweet returned in a timeline will include a user object including only the status authors numerical ID. Omit this parameter to receive the complete user object.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_entities": {
+      "description": "The entities node will not be included when set to false.",
+      "required": false,
+      "type": "boolean"
+    },
+    "map": {
+      "description": "When using the map parameter, Tweets that do not exist or cannot be viewed by the current user will still have their key represented but with an explicitly null value paired with it.",
+      "required": false,
+      "type": "boolean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesShowRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesShowRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesShowRequest.json
new file mode 100644
index 0000000..8794973
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesShowRequest.json
@@ -0,0 +1,32 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.StatusesShowRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/statuses/show/id",
+  "properties": {
+    "id": {
+      "description": "The numerical ID of the desired Tweet.",
+      "required": true,
+      "type": "integer"
+    },
+    "trim_user": {
+      "description": "When set to either true , t or 1 , each Tweet returned in a timeline will include a user object including only the status authors numerical ID. Omit this parameter to receive the complete user object.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_my_retweet": {
+      "description": "When set to either true , t or 1 , any Tweets returned that have been retweeted by the authenticating user will include an additional current_user_retweet node, containing the ID of the source status for the retweet.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_entities": {
+      "description": "The entities node will not be included when set to false.",
+      "required": false,
+      "type": "boolean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesUserTimelineRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesUserTimelineRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesUserTimelineRequest.json
new file mode 100644
index 0000000..bdb41df
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/StatusesUserTimelineRequest.json
@@ -0,0 +1,57 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.StatusesUserTimelineRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://api.twitter.com/1.1/statuses/user_timeline.json",
+  "properties": {
+    "user_id": {
+      "description": "The ID of the user for whom to return results for.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results for.",
+      "required": false,
+      "type": "string"
+    },
+    "count": {
+      "description": "Specifies the number of Tweets to try and retrieve, up to a maximum of 200 per distinct request. The value of count is best thought of as a limit to the number of Tweets to return because suspended or deleted content is removed after the count has been applied. We include retweets in the count, even if include_rts is not supplied. It is recommended you always send include_rts=1 when using this API method.",
+      "required": false,
+      "type": "integer"
+    },
+    "since_id": {
+      "description": "Returns results with an ID greater than (that is, more recent than) the specified ID. There are limits to the number of Tweets which can be accessed through the API. If the limit of Tweets has occured since the since_id, the since_id will be forced to the oldest ID available.",
+      "required": false,
+      "type": "integer"
+    },
+    "max_id": {
+      "description": "Returns results with an ID less than (that is, older than) or equal to the specified ID.",
+      "required": false,
+      "type": "integer"
+    },
+    "trim_user": {
+      "description": "When set to either true , t or 1 , each Tweet returned in a timeline will include a user object including only the status authors numerical ID. Omit this parameter to receive the complete user object.",
+      "required": false,
+      "type": "boolean"
+    },
+    "exclude_replies": {
+      "description": "This parameter will prevent replies from appearing in the returned timeline. Using exclude_replies with the count parameter will mean you will receive up-to count tweets \u2014 this is because the count parameter retrieves that many Tweets before filtering out retweets and replies. This parameter is only supported for JSON and XML responses.",
+      "required": false,
+      "type": "boolean"
+    },
+    "contributor_details": {
+      "description": "This parameter enhances the contributors element of the status response to include the screen_name of the contributor. By default only the user_id of the contributor is included.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_rts": {
+      "description": "When set to false , the timeline will strip any native retweets (though they will still count toward both the maximal length of the timeline and the slice selected by the count parameter). Note: If you\u2019re using the trim_user parameter in conjunction with include_rts, the retweets will still contain a full user object.",
+      "required": false,
+      "type": "boolean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersLookupRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersLookupRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersLookupRequest.json
new file mode 100644
index 0000000..0d75a9e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersLookupRequest.json
@@ -0,0 +1,33 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.UsersLookupRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/users/lookup",
+  "properties": {
+    "user_id": {
+      "description": "A comma separated list of user IDs, up to 100 are allowed in a single request. You are strongly encouraged to use a POST for larger requests.",
+      "required": true,
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "screen_name": {
+      "description": "A comma separated list of screen names, up to 100 are allowed in a single request. You are strongly encouraged to use a POST for larger (up to 100 screen names) requests.",
+      "required": true,
+      "type": "array",
+      "items": {
+        "type": "string"
+      }
+    },
+    "include_entities": {
+      "description": "The entities node will not be included when set to false.",
+      "required": false,
+      "type": "boolean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersShowRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersShowRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersShowRequest.json
new file mode 100644
index 0000000..c7d04ce
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/UsersShowRequest.json
@@ -0,0 +1,27 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.UsersShowRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/users/show/id",
+  "properties": {
+    "user_id": {
+      "description": "The ID of the user for whom to return results. Either an id or screen_name is required for this method.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results. Either a id or screen_name is required for this method.",
+      "required": false,
+      "type": "string"
+    },
+    "include_entities": {
+      "description": "The entities node will not be included when set to false.",
+      "required": false,
+      "type": "boolean"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterConfiguration.json
new file mode 100644
index 0000000..878bbb4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterConfiguration.json
@@ -0,0 +1,92 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "protocol": {
+            "type": "string",
+            "description": "The protocol",
+            "default": "https"
+        },
+        "host": {
+            "type": "string",
+            "description": "The host",
+            "default": "api.twitter.com"
+        },
+        "port": {
+            "type": "integer",
+            "description": "The port",
+            "default": 443
+        },
+        "version": {
+            "type": "string",
+            "description": "The version",
+            "default": "1.1"
+        },
+        "endpoint": {
+            "type": "string",
+            "description": "The endpoint"
+        },
+        "jsonStoreEnabled": {
+            "default" : true,
+            "type": "string"
+        },
+        "oauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "appName": {
+                    "type": "string"
+                },
+                "consumerKey": {
+                    "type": "string"
+                },
+                "consumerSecret": {
+                    "type": "string"
+                },
+                "accessToken": {
+                    "type": "string"
+                },
+                "accessTokenSecret": {
+                    "type": "string"
+                }
+            }
+        },
+        "basicauth": {
+            "type": "object",
+            "dynamic": "true",
+            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
+            "javaInterfaces": ["java.io.Serializable"],
+            "properties": {
+                "username": {
+                    "type": "string"
+                },
+                "password": {
+                    "type": "string"
+                }
+            }
+        },
+        "threadsPerProvider": {
+            "type": "integer",
+            "description": "number of threads per provider",
+            "default": 10
+        },
+        "retrySleepMs": {
+             "type": "integer",
+             "description": "ms to sleep when hitting a rate limit",
+             "default": 100000
+        },
+        "retryMax": {
+             "type": "integer",
+             "description": "max times to retry",
+             "default": 10
+        }
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterFollowingConfiguration.json
new file mode 100644
index 0000000..89fc7af
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterFollowingConfiguration.json
@@ -0,0 +1,33 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterFollowingConfiguration",
+    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "ids_only": {
+            "type": "boolean",
+            "description": "Whether to collect ids only, or full profiles",
+            "default": "true"
+        },
+        "max_items": {
+            "type": "integer",
+            "description": "Max items per user to collect",
+            "default": 50000
+        },
+        "max_pages": {
+            "type": "integer",
+            "description": "Max pages per user to request",
+            "default": 10
+        },
+        "page_size": {
+            "type": "integer",
+            "description": "Max items per page to request",
+            "default": 5000
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterStreamConfiguration.json
new file mode 100644
index 0000000..6fa2a73
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterStreamConfiguration.json
@@ -0,0 +1,45 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "includeEntities": {
+            "type": "string"
+        },
+        "truncated": {
+            "type": "boolean"
+        },
+        "filter-level": {
+            "type": "string",
+            "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
+        },
+        "with": {
+            "type": "string",
+            "description": "Typically following or user"
+        },
+        "replies": {
+            "type": "string",
+            "description": "Set to all, to see all @replies"
+        },
+        "follow": {
+            "type": "array",
+            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
+            "items": {
+                "type": "integer"
+            }
+        },
+        "track": {
+            "type": "array",
+            "description": "A list of phrases which will be used to determine what Tweets will be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
new file mode 100644
index 0000000..37ed60e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterTimelineProviderConfiguration.json
@@ -0,0 +1,23 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
+    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "max_items": {
+            "type": "integer",
+            "description": "Max items per user to collect",
+            "default": 3200
+        },
+        "max_pages": {
+            "type": "integer",
+            "description": "Max items per page to request",
+            "default": 16
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterUserInformationConfiguration.json
new file mode 100644
index 0000000..405c87a
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/config/TwitterUserInformationConfiguration.json
@@ -0,0 +1,25 @@
+{
+    "$schema": "http://json-schema.org/draft-03/schema",
+    "$license": [
+        "http://www.apache.org/licenses/LICENSE-2.0"
+    ],
+    "id": "#",
+    "type": "object",
+    "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration",
+    "extends": {"$ref":"TwitterConfiguration.json"},
+    "javaInterfaces": ["java.io.Serializable"],
+    "properties": {
+        "info": {
+            "type": "array",
+            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
+            "items": {
+                "type": "string"
+            }
+        },
+        "page_size": {
+            "type": "integer",
+            "description": "Max items per page to request",
+            "default": 200
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/api/TwitterOAuthRequestInterceptorTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/api/TwitterOAuthRequestInterceptorTest.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/api/TwitterOAuthRequestInterceptorTest.java
new file mode 100644
index 0000000..9634c9d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/api/TwitterOAuthRequestInterceptorTest.java
@@ -0,0 +1,124 @@
+package org.apache.streams.twitter.test.api;
+
+import org.apache.streams.twitter.TwitterOAuthConfiguration;
+import org.apache.streams.twitter.api.TwitterOAuthRequestInterceptor;
+import org.apache.streams.twitter.test.utils.TwitterActivityConvertersTest;
+
+import org.apache.http.HttpHost;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.protocol.HttpCoreContext;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.streams.twitter.api.TwitterOAuthRequestInterceptor.encode;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by sblackmon on 3/26/17.
+ */
+public class TwitterOAuthRequestInterceptorTest {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterActivityConvertersTest.class);
+
+  @Test
+  public void testEncode() {
+    assertEquals( "Ladies%20%2B%20Gentlemen", encode("Ladies + Gentlemen"));
+    assertEquals( "An%20encoded%20string%21", encode("An encoded string!"));
+    assertEquals( "Dogs%2C%20Cats%20%26%20Mice", encode("Dogs, Cats & Mice"));
+    assertEquals( "%E2%98%83", encode("\u2603"));
+  }
+
+  @Test
+  public void testGenerateSignatureBaseString() {
+    Map<String,String> testParamMap = new HashMap<>();
+    testParamMap.put("status", encode("Hello Ladies + Gentlemen, a signed OAuth request!"));
+    testParamMap.put("include_entities", "true");
+    testParamMap.put("oauth_consumer_key", "xvz1evFS4wEEPTGEFPHBog");
+    testParamMap.put("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg");
+    testParamMap.put("oauth_signature_method", "HMAC-SHA1");
+    testParamMap.put("oauth_timestamp", "1318622958");
+    testParamMap.put("oauth_token", "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb");
+    testParamMap.put("oauth_version", "1.0");
+    String signature_parameter_string = TwitterOAuthRequestInterceptor.generateSignatureParameterString(testParamMap);
+    String signature_base_string = TwitterOAuthRequestInterceptor.generateSignatureBaseString("POST", "https://api.twitter.com/1/statuses/update.json", signature_parameter_string);
+    String expected = "POST&https%3A%2F%2Fapi.twitter.com%2F1%2Fstatuses%2Fupdate.json&include_entities%3Dtrue%26oauth_consumer_key%3Dxvz1evFS4wEEPTGEFPHBog%26oauth_nonce%3DkYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1318622958%26oauth_token%3D370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb%26oauth_version%3D1.0%26status%3DHello%2520Ladies%2520%252B%2520Gentlemen%252C%2520a%2520signed%2520OAuth%2520request%2521";
+    assertEquals(expected, signature_base_string);
+  }
+
+  /**
+   * @see <a href=https://dev.twitter.com/oauth/overview/creating-signatures">https://dev.twitter.com/oauth/overview/creating-signatures</a>
+   */
+  @Test
+  public void testGenerateSignatureParameterString() {
+    Map<String,String> testParamMap = new HashMap<>();
+    testParamMap.put("status", encode("Hello Ladies + Gentlemen, a signed OAuth request!"));
+    testParamMap.put("include_entities", "true");
+    testParamMap.put("oauth_consumer_key", "xvz1evFS4wEEPTGEFPHBog");
+    testParamMap.put("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg");
+    testParamMap.put("oauth_signature_method", "HMAC-SHA1");
+    testParamMap.put("oauth_timestamp", "1318622958");
+    testParamMap.put("oauth_token", "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb");
+    testParamMap.put("oauth_version", "1.0");
+    String signature_parameter_string = TwitterOAuthRequestInterceptor.generateSignatureParameterString(testParamMap);
+    String expected = "include_entities=true&oauth_consumer_key=xvz1evFS4wEEPTGEFPHBog&oauth_nonce=kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1318622958&oauth_token=370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb&oauth_version=1.0&status=Hello%20Ladies%20%2B%20Gentlemen%2C%20a%20signed%20OAuth%20request%21";
+    assertEquals(expected, signature_parameter_string);
+  }
+
+  @Test
+  public void testGenerateAuthorizationHeaderString() {
+    Map<String,String> oauthParamMap = new HashMap<>();
+    oauthParamMap.put("oauth_consumer_key", "xvz1evFS4wEEPTGEFPHBog");
+    oauthParamMap.put("oauth_nonce", "kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg");
+    oauthParamMap.put("oauth_signature", "tnnArxj06cWHq44gCs1OSKk/jLY=");
+    oauthParamMap.put("oauth_signature_method", "HMAC-SHA1");
+    oauthParamMap.put("oauth_timestamp", "1318622958");
+    oauthParamMap.put("oauth_token", "370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb");
+    oauthParamMap.put("oauth_version", "1.0");
+    String expected = "OAuth oauth_consumer_key=\"xvz1evFS4wEEPTGEFPHBog\", oauth_nonce=\"kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg\", oauth_signature=\"tnnArxj06cWHq44gCs1OSKk%2FjLY%3D\", oauth_signature_method=\"HMAC-SHA1\", oauth_timestamp=\"1318622958\", oauth_token=\"370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb\", oauth_version=\"1.0\"";
+    String authorization_header_string = TwitterOAuthRequestInterceptor.generateAuthorizationHeaderString(oauthParamMap);
+    assertEquals(expected, authorization_header_string);
+  }
+
+  @Test
+  public void testComputeSignature() throws Exception {
+    String signature_base_string = "POST&https%3A%2F%2Fapi.twitter.com%2F1%2Fstatuses%2Fupdate.json&include_entities%3Dtrue%26oauth_consumer_key%3Dxvz1evFS4wEEPTGEFPHBog%26oauth_nonce%3DkYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg%26oauth_signature_method%3DHMAC-SHA1%26oauth_timestamp%3D1318622958%26oauth_token%3D370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb%26oauth_version%3D1.0%26status%3DHello%2520Ladies%2520%252B%2520Gentlemen%252C%2520a%2520signed%2520OAuth%2520request%2521";
+    String signing_key = "kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw&LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE";
+    String expected = "tnnArxj06cWHq44gCs1OSKk/jLY=";
+    String oauth_signature = TwitterOAuthRequestInterceptor.computeSignature(signature_base_string, signing_key);
+    assertEquals(expected, oauth_signature);
+  }
+
+  @Test
+  public void testProcess() throws Exception {
+    URI testURI = new URIBuilder()
+        .setPath("/1/statuses/update.json")
+        .setParameter("include_entities", "true")
+        .build();
+    HttpPost testRequest = new HttpPost(testURI);
+    testRequest.setEntity(new StringEntity("status="+encode("Hello Ladies + Gentlemen, a signed OAuth request!")));
+    HttpHost host = new HttpHost("api.twitter.com", -1, "https");
+    HttpRequestWrapper wrapper = HttpRequestWrapper.wrap(testRequest, host);
+    TwitterOAuthConfiguration testOauthConfiguration = new TwitterOAuthConfiguration()
+        .withConsumerKey("xvz1evFS4wEEPTGEFPHBog")
+        .withConsumerSecret("kAcSOqF21Fu85e7zjz7ZN2U4ZRhfV3WpwPAoE3Z7kBw")
+        .withAccessToken("370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb")
+        .withAccessTokenSecret("LswwdoUaIvS8ltyTt5jkRh4J50vUPVVHtR2YPi5kE");
+    TwitterOAuthRequestInterceptor interceptor = Mockito.spy(new TwitterOAuthRequestInterceptor(testOauthConfiguration));
+    Mockito.when(interceptor.generateNonce()).thenReturn("kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg");
+    Mockito.when(interceptor.generateTimestamp()).thenReturn("1318622958");
+    interceptor.process(wrapper, new HttpCoreContext());
+    String expected = "OAuth oauth_consumer_key=\"xvz1evFS4wEEPTGEFPHBog\", oauth_nonce=\"kYjzVBB8Y0ZFabxSWbWovY3uYSQ2pTgmZeNu2VS4cg\", oauth_signature=\"tnnArxj06cWHq44gCs1OSKk%2FjLY%3D\", oauth_signature_method=\"HMAC-SHA1\", oauth_timestamp=\"1318622958\", oauth_token=\"370773112-GmHxMAgYyLbNEtIKZeRNFsMKPR9EyMZeS9weJAEb\", oauth_version=\"1.0\"";
+    assertEquals(1, wrapper.getHeaders("Authorization").length);
+    assertEquals(expected, wrapper.getFirstHeader("Authorization").getValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
index 117cba4..d8f53fd 100644
--- a/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
+++ b/streams-contrib/streams-provider-twitter/src/test/java/org/apache/streams/twitter/test/providers/TwitterTimelineProviderIT.java
@@ -51,7 +51,7 @@ public class TwitterTimelineProviderIT {
       }
     });
     testThread.start();
-    testThread.join(60000);
+    testThread.join(600000);
 
     File out = new File(outfile);
     Assert.assertTrue (out.exists());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-monitoring/pom.xml
----------------------------------------------------------------------
diff --git a/streams-monitoring/pom.xml b/streams-monitoring/pom.xml
index 58f564e..f55b966 100644
--- a/streams-monitoring/pom.xml
+++ b/streams-monitoring/pom.xml
@@ -48,14 +48,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpcomponents-core</artifactId>
-            <type>pom</type>
-            <version>${httpcomponents.core.version}</version>
+            <artifactId>httpcore</artifactId>
         </dependency>
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcomponents.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>


[3/3] incubator-streams git commit: STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

Posted by sb...@apache.org.
STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

resolves #360
commit 05aa4ddc907c845fdbbfc9e8feb85cc0f9ecc2fb
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Apr 17 17:04:24 2017 -0500

    upgrade hosebird client because why not

commit 3dc5f515fc067fbd50af431139b98c3bc408cff8
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Wed Apr 12 23:48:21 2017 -0500

    PR feedback, juneau target version bump

commit b67487a54f7e2b6676ab031f8ff0091393111f56
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Wed Apr 12 22:48:58 2017 -0500

    PR feedback - javadoc accuracy

commit 996c1b47e9dc485692dbc5a4c18f5754c697cd91
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Wed Apr 12 22:33:27 2017 -0500

    clean up

commit 8618abcad4e6087c0270ba8c54a201888071e0e8
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Apr 11 16:45:11 2017 -0500

    retry logic

commit d7cd07110d602d24a07aad8e8f390ff0d98f1e19
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Apr 3 15:11:32 2017 -0500

    checkstyle clean up

commit 069be4da745b9cd262159cca1982a9b52cb5cefc
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Apr 3 14:56:01 2017 -0500

    all integration tests passing using juneau 6.1.0-incubating

commit 517b85f02e93c51d5fa36a704f7f0e0e2cdaed4e
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Apr 3 11:01:25 2017 -0500

    oauth signing working

commit 06845f2dd848c91971c10f194f0d5503d5f4a964
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Tue Mar 28 13:20:03 2017 -0500

    return empty array not null on error

commit eddf003ba5b5555b10c3d51267c00225b6da4ef1
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 22:46:19 2017 -0500

    TwitterOAuthRequestInterceptorTest.testProcess

    TwitterOAuthRequestInterceptorTest.testProcess is pretty much directly off the twitter dev guidelines and tests the entire auth process at once.  passes. ITs still fail

commit c623f7139421049d5b536576c739b60e959cf11e
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 17:55:09 2017 -0500

    STREAMS-496: going through guide word-by-word and debugging line-by-line looking for mistakes

commit 5a07c57b553f8c87af1d42e0bbef4f0795f17bf7
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 17:44:37 2017 -0500

    STREAMS-496: more unit tests, all passing, ITs still fail

commit edc2662fd68b9e1e04c20b8721a44a6ee9fb36e6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 16:20:10 2017 -0500

    adding unit testing to debug authentication

commit 1885090c6d683c8a9a6df7c64f933529378ee965
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 13:09:24 2017 -0500

    cleanup authentication.  debugging \u2018Bad Authentication data.\u2019

commit 434657439abc0dde2998a8cb213d8e8398f43ad6
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 12:56:34 2017 -0500

    first implementation of oauth signing

commit 0d172e76232e3d0bd48af8d6616716c444eb94f0
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sun Mar 26 12:56:11 2017 -0500

    STREAMS-496: consolidate httpcomponents version selection

commit 78f30284caefb09e06818a9f435c3f380ee986a3
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Sat Mar 25 11:38:06 2017 -0500

    STREAMS-496: WIP - twitter4j gone, src/main/java compiles

commit c95bccd41b4f5f51d7e614e5927210458d1dbfbd
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Fri Mar 24 14:21:34 2017 -0500

    implementations using RestClient

commit 68d7976f928173dbd6de636c58d1a23dbe95eb15
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Fri Mar 24 13:39:52 2017 -0500

    STREAMS-496: additional pojos and interfaces needed to maintain features

commit a8fd796096296bb2e8af61b722c4a261acbba987
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Mar 20 13:07:27 2017 -0500

    more WIP

commit 7dedbb42251e1c5214690e32f12a5efb5032dc92
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Date:   Mon Mar 20 01:35:33 2017 -0500

    STREAMS-496: WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/67497a48
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/67497a48
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/67497a48

Branch: refs/heads/master
Commit: 67497a488a9417dcdbf702f707729928500c8d5c
Parents: 19caf33
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Wed Apr 19 14:49:38 2017 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Wed Apr 19 14:49:38 2017 -0500

----------------------------------------------------------------------
 pom.xml                                         |  14 +-
 streams-components/streams-http/pom.xml         |   1 -
 streams-contrib/streams-persist-riak/pom.xml    |   3 -
 .../streams-provider-twitter/pom.xml            |  25 +-
 .../apache/streams/twitter/api/Followers.java   |  52 ++
 .../org/apache/streams/twitter/api/Friends.java |  52 ++
 .../apache/streams/twitter/api/Statuses.java    |  62 +++
 .../org/apache/streams/twitter/api/Twitter.java | 521 +++++++++++++++++++
 .../api/TwitterOAuthRequestInterceptor.java     | 236 +++++++++
 .../twitter/api/TwitterRetryHandler.java        | 162 ++++++
 .../org/apache/streams/twitter/api/Users.java   |  51 ++
 .../converter/util/TwitterActivityUtil.java     |   6 +-
 .../FetchAndReplaceTwitterProcessor.java        | 107 ++--
 .../twitter/provider/TwitterErrorHandler.java   | 133 -----
 .../TwitterFollowersIdsProviderTask.java        | 124 +++++
 .../TwitterFollowersListProviderTask.java       | 120 +++++
 .../provider/TwitterFollowingProvider.java      | 188 +++++--
 .../provider/TwitterFollowingProviderTask.java  | 266 ----------
 .../provider/TwitterFriendsIdsProviderTask.java | 125 +++++
 .../TwitterFriendsListProviderTask.java         | 119 +++++
 .../provider/TwitterTimelineProvider.java       | 157 +++---
 .../provider/TwitterTimelineProviderTask.java   |  81 ++-
 .../TwitterUserInformationProvider.java         | 256 ++++-----
 .../TwitterUserInformationProviderTask.java     |  77 +++
 .../streams/twitter/TwitterConfiguration.json   |  87 ----
 .../twitter/TwitterFollowingConfiguration.json  |  33 --
 .../twitter/TwitterStreamConfiguration.json     |  45 --
 .../TwitterTimelineProviderConfiguration.json   |  23 -
 .../TwitterUserInformationConfiguration.json    |  25 -
 .../twitter/api/FollowersIdsRequest.json        |  13 +
 .../twitter/api/FollowersIdsResponse.json       |  32 ++
 .../twitter/api/FollowersListRequest.json       |  13 +
 .../twitter/api/FollowersListResponse.json      |  33 ++
 .../twitter/api/FollowingIdsRequest.json        |  36 ++
 .../twitter/api/FollowingListRequest.json       |  41 ++
 .../streams/twitter/api/FriendsIdsRequest.json  |  13 +
 .../streams/twitter/api/FriendsIdsResponse.json |  32 ++
 .../streams/twitter/api/FriendsListRequest.json |  13 +
 .../twitter/api/FriendsListResponse.json        |  33 ++
 .../twitter/api/StatusesLookupRequest.json      |  35 ++
 .../twitter/api/StatusesShowRequest.json        |  32 ++
 .../api/StatusesUserTimelineRequest.json        |  57 ++
 .../streams/twitter/api/UsersLookupRequest.json |  33 ++
 .../streams/twitter/api/UsersShowRequest.json   |  27 +
 .../twitter/config/TwitterConfiguration.json    |  92 ++++
 .../config/TwitterFollowingConfiguration.json   |  33 ++
 .../config/TwitterStreamConfiguration.json      |  45 ++
 .../TwitterTimelineProviderConfiguration.json   |  23 +
 .../TwitterUserInformationConfiguration.json    |  25 +
 .../api/TwitterOAuthRequestInterceptorTest.java | 124 +++++
 .../providers/TwitterTimelineProviderIT.java    |   2 +-
 streams-monitoring/pom.xml                      |   5 +-
 52 files changed, 2911 insertions(+), 1032 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e83dab2..d779fa9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -359,8 +359,8 @@
         <facebook4j.version>2.4.7</facebook4j.version>
         <mockito.version>1.10.19</mockito.version>
         <powermock.version>1.6.5</powermock.version>
-        <httpcomponents.core.version>4.4.5</httpcomponents.core.version>
-        <httpcomponents.client.version>4.5.2</httpcomponents.client.version>
+        <httpcomponents.core.version>4.4.6</httpcomponents.core.version>
+        <httpcomponents.client.version>4.5.3</httpcomponents.client.version>
         <doxia.version>1.7</doxia.version>
 
         <!-- osgi configuration -->
@@ -978,6 +978,16 @@
             <version>${commons-lang3.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>${httpcomponents.core.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <version>${httpcomponents.client.version}</version>
+        </dependency>
+        <dependency>
             <groupId>com.typesafe</groupId>
             <artifactId>config</artifactId>
             <version>${typesafe.config.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-components/streams-http/pom.xml
----------------------------------------------------------------------
diff --git a/streams-components/streams-http/pom.xml b/streams-components/streams-http/pom.xml
index 4d97bb6..603c02e 100644
--- a/streams-components/streams-http/pom.xml
+++ b/streams-components/streams-http/pom.xml
@@ -62,7 +62,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcomponents.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-persist-riak/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-riak/pom.xml b/streams-contrib/streams-persist-riak/pom.xml
index 3b24fc2..f96c2c3 100644
--- a/streams-contrib/streams-persist-riak/pom.xml
+++ b/streams-contrib/streams-persist-riak/pom.xml
@@ -30,8 +30,6 @@
     <description>Riak Module</description>
 
     <properties>
-        <httpcomponents.core.version>4.3.5</httpcomponents.core.version>
-        <httpcomponents.client.version>4.3.5</httpcomponents.client.version>
         <riak.version>2.0.6</riak.version>
     </properties>
 
@@ -58,7 +56,6 @@
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
-            <version>${httpcomponents.client.version}</version>
             <exclusions>
                 <exclusion>
                     <groupId>commons-logging</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/pom.xml b/streams-contrib/streams-provider-twitter/pom.xml
index 91d2f88..6a0778d 100644
--- a/streams-contrib/streams-provider-twitter/pom.xml
+++ b/streams-contrib/streams-provider-twitter/pom.xml
@@ -90,22 +90,31 @@
         <dependency>
             <groupId>com.twitter</groupId>
             <artifactId>hbc-core</artifactId>
-            <version>2.1.0</version>
+            <version>2.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>jcl-over-slf4j</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.twitter4j</groupId>
-            <artifactId>twitter4j-core</artifactId>
-            <version>4.0.3</version>
+            <groupId>org.apache.juneau</groupId>
+            <artifactId>juneau-rest-client</artifactId>
+            <version>6.1.0-incubating</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpclient</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
             <version>1.3</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -114,6 +123,12 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>${mockito.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
new file mode 100644
index 0000000..5e04e6e
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Followers.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters.
+ *
+ * @see <a href="https://dev.twitter.com/rest/reference/get/followers/ids">https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Followers {
+
+  /**
+   * Returns a cursored collection of user IDs for every user following the specified user.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.FollowersIdsRequest}
+   * @return List < Tweet >
+   * @see <a href="https://dev.twitter.com/rest/reference/get/followers/ids">https://dev.twitter.com/rest/reference/get/followers/ids</a>
+   *
+   */
+  public FollowersIdsResponse ids(FollowersIdsRequest parameters);
+
+  /**
+   * Returns a cursored collection of user objects for users following the specified user.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.FollowersListRequest}
+   * @return List < Tweet >
+   * @see <a href="https://dev.twitter.com/rest/reference/get/followers/list">https://dev.twitter.com/rest/reference/get/followers/list</a>
+   *
+   */
+  public FollowersListResponse list(FollowersListRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
new file mode 100644
index 0000000..d0d1f72
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Friends.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters.
+ *
+ * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Friends {
+
+  /**
+   * Returns a cursored collection of user IDs for every user the specified user is following.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.FriendsIdsRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/friends/ids">https://dev.twitter.com/rest/reference/get/friends/ids</a>
+   *
+   */
+  public FriendsIdsResponse ids(FriendsIdsRequest parameters);
+
+  /**
+   * Returns a cursored collection of user objects for every user the specified user is following.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.FriendsListRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/friends/list">https://dev.twitter.com/rest/reference/get/friends/list</a>
+   *
+   */
+  public FriendsListResponse list(FriendsListRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
new file mode 100644
index 0000000..c9945b0
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Statuses.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters.
+ *
+ * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://api.twitter.com/1.1/statuses/user_timeline.json</a>
+ */
+public interface Statuses {
+
+  /**
+   * Returns fully-hydrated Tweet objects for up to 100 Tweets per request, as specified by comma-separated values passed to the id parameter.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.StatusesLookupRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/lookup">https://dev.twitter.com/rest/reference/get/statuses/lookup</a>
+   *
+   */
+  public List<Tweet> lookup(StatusesLookupRequest parameters);
+
+  /**
+   * Returns a single Tweet, specified by the id parameter. The Tweet\u2019s author will also be embedded within the Tweet.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.StatusesShowRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/show/id">https://dev.twitter.com/rest/reference/get/statuses/show/id</a>
+   *
+   */
+  public Tweet show(StatusesShowRequest parameters);
+
+  /**
+   * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.StatusesUserTimelineRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/statuses/user_timeline">https://dev.twitter.com/rest/reference/get/statuses/user_timeline</a>
+   *
+   */
+  public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
new file mode 100644
index 0000000..f4570d6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Twitter.java
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.TwitterConfiguration;
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.twitter.provider.TwitterProviderUtil;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.params.ClientPNames;
+import org.apache.http.client.params.CookiePolicy;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.juneau.json.JsonParser;
+import org.apache.juneau.parser.ParseException;
+import org.apache.juneau.plaintext.PlainTextSerializer;
+import org.apache.juneau.rest.client.RestCall;
+import org.apache.juneau.rest.client.RestCallException;
+import org.apache.juneau.rest.client.RestClient;
+//import org.apache.juneau.rest.client.RestClientBuilder;
+import org.apache.juneau.rest.client.RetryOn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of all twitter interfaces using juneau.
+ */
+public class Twitter implements Followers, Friends, Statuses, Users {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Twitter.class);
+
+  private static Map<TwitterConfiguration, Twitter> INSTANCE_MAP = new ConcurrentHashMap<>();
+
+  private TwitterConfiguration configuration;
+
+  private ObjectMapper mapper;
+
+  private String rootUrl;
+
+  private CloseableHttpClient httpclient;
+
+  private HttpRequestInterceptor oauthInterceptor;
+
+  RestClient restClient;
+
+  private Twitter(TwitterConfiguration configuration) throws InstantiationException {
+    this.configuration = configuration;
+    this.rootUrl = TwitterProviderUtil.baseUrl(configuration);
+    this.oauthInterceptor = new TwitterOAuthRequestInterceptor(configuration.getOauth());
+    this.httpclient = HttpClientBuilder.create()
+        .addInterceptorFirst(oauthInterceptor)
+        .setDefaultRequestConfig(RequestConfig.custom()
+          .setConnectionRequestTimeout(5000)
+          .setConnectTimeout(5000)
+          .setSocketTimeout(5000)
+          .setCookieSpec("easy")
+          .build()
+        )
+        .setMaxConnPerRoute(20)
+        .setMaxConnTotal(100)
+        .build();
+
+//  TODO: juneau-6.3.x-incubating
+//  this.restClient = new RestClientBuilder()
+//        .httpClient(httpclient, true)
+//        .parser(JsonParser.class)
+//        .rootUrl(rootUrl)
+//        .retryable(
+//            configuration.getRetryMax().intValue(),
+//            configuration.getRetrySleepMs(),
+//            new TwitterRetryHandler())
+//        .build();
+    this.restClient = new RestClient()
+        .setHttpClient(httpclient)
+        .setParser(JsonParser.class)
+        .setRootUrl(rootUrl);
+
+    this.mapper = StreamsJacksonMapper.getInstance();
+  }
+
+  public static Twitter getInstance(TwitterConfiguration configuration) throws InstantiationException {
+    if (INSTANCE_MAP.containsKey(configuration) && INSTANCE_MAP.get(configuration) != null) {
+      return INSTANCE_MAP.get(configuration);
+    } else {
+      Twitter twitter = new Twitter(configuration);
+      INSTANCE_MAP.put(configuration, twitter);
+      return INSTANCE_MAP.get(configuration);
+    }
+  }
+
+  @Override
+  public List<Tweet> userTimeline(StatusesUserTimelineRequest parameters) {
+    try {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = restClient.getRemoteableProxy("/statuses/user_timeline.json", Statuses.class);
+//      List<Tweet> result = restStatuses.userTimeline(parameters);
+//      return result;
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/user_timeline.json");
+      if( StringUtils.isNotBlank(parameters.getUserId().toString())) {
+        uriBuilder.addParameter("user_id", parameters.getUserId().toString());
+      }
+      if( StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if( Objects.nonNull(parameters.getSinceId()) && StringUtils.isNotBlank(parameters.getSinceId().toString())) {
+        uriBuilder.addParameter("since_id", parameters.getSinceId().toString());
+      }
+      if( Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if( Objects.nonNull(parameters.getMaxId()) && StringUtils.isNotBlank(parameters.getMaxId().toString())) {
+        uriBuilder.addParameter("max_id", parameters.getMaxId().toString());
+      }
+      if( Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString());
+      }
+      if( Objects.nonNull(parameters.getExcludeReplies()) && StringUtils.isNotBlank(parameters.getExcludeReplies().toString())) {
+        uriBuilder.addParameter("exclude_replies", parameters.getExcludeReplies().toString());
+      }
+      if( Objects.nonNull(parameters.getContributorDetails()) && StringUtils.isNotBlank(parameters.getContributorDetails().toString())) {
+        uriBuilder.addParameter("contributor_details", parameters.getContributorDetails().toString());
+      }
+      if( Objects.nonNull(parameters.getIncludeRts()) && StringUtils.isNotBlank(parameters.getIncludeRts().toString())) {
+        uriBuilder.addParameter("include_rts", parameters.getIncludeRts().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class);
+        List<Tweet> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, Tweet.class)));
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public List<Tweet> lookup(StatusesLookupRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = restClient.getRemoteableProxy("/statuses/lookup.json", Statuses.class);
+//      List<Tweet> result = restStatuses.lookup(parameters);
+//      return result;
+    String ids = StringUtils.join(parameters.getId(), ',');
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/lookup.json");
+      if( Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if( Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString());
+      }
+      if( Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString());
+      }
+      if( Objects.nonNull(parameters.getMap()) && StringUtils.isNotBlank(parameters.getMap().toString())) {
+        uriBuilder.addParameter("map", parameters.getMap().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class);
+        List<Tweet> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, Tweet.class)));
+        //List<Tweet> result = restCall.getResponse(LinkedList.class, Tweet.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public Tweet show(StatusesShowRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Statuses restStatuses = restClient.getRemoteableProxy("/statuses/show.json", Statuses.class);
+//      Tweet result = restStatuses.show(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/statuses/show.json");
+      if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getTrimUser()) && StringUtils.isNotBlank(parameters.getTrimUser().toString())) {
+        uriBuilder.addParameter("trim_user", parameters.getTrimUser().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeMyRetweet()) && StringUtils.isNotBlank(parameters.getIncludeMyRetweet().toString())) {
+        uriBuilder.addParameter("include_my_retweet", parameters.getIncludeMyRetweet().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //Tweet result = restCall.getResponse(Tweet.class);
+        Tweet result = mapper.readValue(restResponseEntity, Tweet.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FriendsIdsResponse ids(FriendsIdsRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Friends restFriends = restClient.getRemoteableProxy("/friends/ids.json", Friends.class);
+//      FriendsIdsResponse result = restFriends.ids(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/friends/ids.json");
+      if( Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if( Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if( Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if( StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if( Objects.nonNull(parameters.getStringifyIds()) && StringUtils.isNotBlank(parameters.getStringifyIds().toString())) {
+        uriBuilder.addParameter("stringify_ids", parameters.getStringifyIds().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FriendsIdsResponse result = restCall.getResponse(FriendsIdsResponse.class);
+        FriendsIdsResponse result = mapper.readValue(restResponseEntity, FriendsIdsResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FriendsListResponse list(FriendsListRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Friends restFriends = restClient.getRemoteableProxy("/friends/list.json", Friends.class);
+//      FriendsListResponse result = restFriends.list(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/friends/list.json");
+      if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeUserEntities()) && StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) {
+        uriBuilder.addParameter("include_user_entities", parameters.getIncludeUserEntities().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getSkipStatus()) && StringUtils.isNotBlank(parameters.getSkipStatus().toString())) {
+        uriBuilder.addParameter("skip_status", parameters.getSkipStatus().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FriendsListResponse result = restCall.getResponse(FriendsListResponse.class);
+        FriendsListResponse result = mapper.readValue(restResponseEntity, FriendsListResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    }catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FollowersIdsResponse ids(FollowersIdsRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Followers restFollowers = restClient.getRemoteableProxy("/friends/list.json", Followers.class);
+//      FollowersIdsResponse result = restFollowers.ids(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder = new URIBuilder()
+          .setPath("/followers/ids.json");
+      if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getStringifyIds()) && StringUtils.isNotBlank(parameters.getStringifyIds().toString())) {
+        uriBuilder.addParameter("stringify_ids", parameters.getStringifyIds().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        //FollowersIdsResponse result = restCall.getResponse(FollowersIdsResponse.class);
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        FollowersIdsResponse result = mapper.readValue(restResponseEntity, FollowersIdsResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public FollowersListResponse list(FollowersListRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Followers restFollowers = restClient.getRemoteableProxy("/friends/list.json", Followers.class);
+//      FollowersListResponse result = restFollowers.list(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/followers/list.json");
+      if (Objects.nonNull(parameters.getCount()) && StringUtils.isNotBlank(parameters.getCount().toString())) {
+        uriBuilder.addParameter("count", parameters.getCount().toString());
+      }
+      if (Objects.nonNull(parameters.getCursor()) && StringUtils.isNotBlank(parameters.getCursor().toString())) {
+        uriBuilder.addParameter("cursor", parameters.getCursor().toString());
+      }
+      if (Objects.nonNull(parameters.getId()) && StringUtils.isNotBlank(parameters.getId().toString())) {
+        uriBuilder.addParameter("id", parameters.getId().toString());
+      }
+      if (Objects.nonNull(parameters.getIncludeUserEntities()) && StringUtils.isNotBlank(parameters.getIncludeUserEntities().toString())) {
+        uriBuilder.addParameter("include_user_entities", parameters.getIncludeUserEntities().toString());
+      }
+      if (StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getSkipStatus()) && StringUtils.isNotBlank(parameters.getSkipStatus().toString())) {
+        uriBuilder.addParameter("skip_status", parameters.getSkipStatus().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        //FollowersListResponse result = restCall.getResponse(FollowersListResponse.class);
+        FollowersListResponse result = mapper.readValue(restResponseEntity, FollowersListResponse.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    }catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+  @Override
+  public List<User> lookup(UsersLookupRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", Users.class);
+//      List<User> result = restUsers.lookup(parameters);
+//      return result;
+    String user_ids = StringUtils.join(parameters.getUserId(), ',');
+    String screen_names = StringUtils.join(parameters.getScreenName(), ',');
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/users/lookup.json");
+      if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(screen_names) && StringUtils.isNotBlank(screen_names)) {
+        uriBuilder.addParameter("screen_name", screen_names);
+      }
+      if (Objects.nonNull(user_ids) && StringUtils.isNotBlank(user_ids)) {
+        uriBuilder.addParameter("user_id", user_ids);
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+//      List<User> result = restCall.getResponse(LinkedList.class, User.class);
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        ArrayNode resultArrayNode = mapper.readValue(restResponseEntity, ArrayNode.class);
+        List<User> result = new ArrayList();
+        resultArrayNode.iterator().forEachRemaining(item -> result.add(mapper.convertValue(item, User.class)));
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return new ArrayList<>();
+  }
+
+  @Override
+  public User show(UsersShowRequest parameters) {
+//  TODO: juneau-6.3.x-incubating
+//      Users restUsers = restClient.getRemoteableProxy("/users/lookup.json", Users.class);
+//      User result = restUsers.show(parameters);
+//      return result;
+    try {
+      URIBuilder uriBuilder =
+          new URIBuilder()
+              .setPath("/users/show.json");
+      if (Objects.nonNull(parameters.getIncludeEntities()) && StringUtils.isNotBlank(parameters.getIncludeEntities().toString())) {
+        uriBuilder.addParameter("include_entities", parameters.getIncludeEntities().toString());
+      }
+      if (Objects.nonNull(parameters.getScreenName()) && StringUtils.isNotBlank(parameters.getScreenName())) {
+        uriBuilder.addParameter("screen_name", parameters.getScreenName());
+      }
+      if (Objects.nonNull(parameters.getUserId()) && StringUtils.isNotBlank(parameters.getUserId().toString())) {
+        uriBuilder.addParameter("user_id", parameters.getUserId().toString());
+      }
+      RestCall restCall = restClient.doGet(uriBuilder.build().toString());
+      try {
+        String restResponseEntity = restCall
+            .setRetryable(configuration.getRetryMax().intValue(), configuration.getRetrySleepMs().intValue(), new TwitterRetryHandler())
+            .getResponseAsString();
+        User result = mapper.readValue(restResponseEntity, User.class);
+        return result;
+      } catch (RestCallException e) {
+        LOGGER.warn("RestCallException", e);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("IOException", e);
+    } catch (URISyntaxException e) {
+      LOGGER.warn("URISyntaxException", e);
+    }
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
new file mode 100644
index 0000000..24c7b04
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterOAuthRequestInterceptor.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.TwitterOAuthConfiguration;
+
+import org.apache.http.HttpException;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpRequestInterceptor;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpRequestWrapper;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import sun.misc.BASE64Encoder;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.security.GeneralSecurityException;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.StringJoiner;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * Handles request signing to api.twitter.com
+ */
+public class TwitterOAuthRequestInterceptor implements HttpRequestInterceptor {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterOAuthRequestInterceptor.class);
+
+  private static final String oauth_signature_encoding = "UTF-8";
+  private static final String oauth_signature_method = "HMAC-SHA1";
+  private static final String oauth_version = "1.0";
+
+  private static final BASE64Encoder base64Encoder = new BASE64Encoder();
+
+  TwitterOAuthConfiguration oAuthConfiguration;
+
+  public TwitterOAuthRequestInterceptor(TwitterOAuthConfiguration oAuthConfiguration) {
+    this.oAuthConfiguration = oAuthConfiguration;
+  }
+
+  @Override
+  public void process(HttpRequest httpRequest, HttpContext httpContext) throws HttpException, IOException {
+
+    String oauth_nonce = generateNonce();
+
+    String oauth_timestamp = generateTimestamp();
+
+    Map<String,String> oauthParamMap = new HashMap<>();
+    oauthParamMap.put("oauth_consumer_key", oAuthConfiguration.getConsumerKey());
+    oauthParamMap.put("oauth_nonce", oauth_nonce);
+    oauthParamMap.put("oauth_signature_method", oauth_signature_method);
+    oauthParamMap.put("oauth_timestamp", oauth_timestamp);
+    oauthParamMap.put("oauth_token", oAuthConfiguration.getAccessToken());
+    oauthParamMap.put("oauth_version", oauth_version);
+
+    String request_host = ((HttpRequestWrapper)httpRequest).getTarget().toString().replace(":443","");
+    String request_path = httpRequest.getRequestLine().getUri().substring(0, httpRequest.getRequestLine().getUri().indexOf('?'));
+    String request_param_line = httpRequest.getRequestLine().getUri().substring(httpRequest.getRequestLine().getUri().indexOf('?')+1);
+    String[] request_params = URLDecoder.decode(request_param_line).split("&");
+
+    Map<String,String> allParamsMap = new HashMap<>(oauthParamMap);
+
+    for( String request_param : request_params ) {
+      String key = request_param.substring(0, request_param.indexOf('='));
+      String value = request_param.substring(request_param.indexOf('=')+1, request_param.length());
+      allParamsMap.put(key, value);
+    }
+
+    String[] body_params;
+    if( ((HttpRequestWrapper) httpRequest).getOriginal() instanceof HttpPost) {
+      String body = EntityUtils.toString(((HttpPost)((HttpRequestWrapper) httpRequest).getOriginal()).getEntity());
+      body_params = body.split(",");
+      for( String body_param : body_params ) {
+        String key = body_param.substring(0, body_param.indexOf('='));
+        String value = body_param.substring(body_param.indexOf('=')+1, body_param.length());
+        allParamsMap.put(key, value);
+      }
+    }
+
+    allParamsMap = encodeMap(allParamsMap);
+
+    String signature_parameter_string = generateSignatureParameterString(allParamsMap);
+
+    String signature_base_string = generateSignatureBaseString(((HttpRequestWrapper) httpRequest).getMethod(), request_host+request_path, signature_parameter_string);
+
+    String signing_key = encode(oAuthConfiguration.getConsumerSecret()) + "&" + encode(oAuthConfiguration.getAccessTokenSecret());
+
+    String oauth_signature;
+    try {
+      oauth_signature = computeSignature(signature_base_string, signing_key);
+    } catch (GeneralSecurityException e) {
+      LOGGER.warn("GeneralSecurityException", e);
+      return;
+    }
+
+    oauthParamMap.put("oauth_signature", oauth_signature);
+
+    String authorization_header_string = generateAuthorizationHeaderString(oauthParamMap);
+
+    httpRequest.setHeader("Authorization", authorization_header_string);
+
+  }
+
+  public String generateTimestamp() {
+    Calendar tempcal = Calendar.getInstance();
+    long ts = tempcal.getTimeInMillis();// get current time in milliseconds
+    String oauth_timestamp = (new Long(ts/1000)).toString();
+    return oauth_timestamp;
+  }
+
+  public String generateNonce() {
+    String uuid_string = UUID.randomUUID().toString();
+    uuid_string = uuid_string.replaceAll("-", "");
+    String oauth_nonce = base64Encoder.encode(uuid_string.getBytes());
+    return oauth_nonce;
+  }
+
+  public static Map<String, String> encodeMap(Map<String, String> map) {
+    Map<String,String> newMap = new HashMap<>();
+    for( String key : map.keySet() ) {
+      String value = map.get(key);
+      newMap.put(encode(key), encode(value));
+    }
+    return newMap;
+  }
+
+
+  public static String generateAuthorizationHeaderString(Map<String,String> oauthParamMap) {
+    SortedSet<String> sortedKeys = new TreeSet<>(oauthParamMap.keySet());
+
+    StringJoiner stringJoiner = new StringJoiner(", ");
+    for( String key : sortedKeys ) {
+      stringJoiner.add(encode(key)+"="+"\""+encode(oauthParamMap.get(key))+"\"");
+    }
+
+    String authorization_header_string = new StringBuilder()
+        .append("OAuth ")
+        .append(stringJoiner.toString())
+        .toString();
+    return authorization_header_string;
+  }
+
+  public static String generateSignatureBaseString(String method, String request_url, String signature_parameter_string) {
+    String signature_base_string = new StringBuilder()
+        .append(method)
+        .append("&")
+        .append(encode(request_url))
+        .append("&")
+        .append(encode(signature_parameter_string))
+        .toString();
+    return signature_base_string;
+  }
+
+  public static String generateSignatureParameterString(Map<String, String> allParamsMap) {
+
+    SortedSet<String> sortedKeys = new TreeSet<>(allParamsMap.keySet());
+
+    StringJoiner stringJoiner = new StringJoiner("&");
+    for( String key : sortedKeys ) {
+      stringJoiner.add(key+"="+allParamsMap.get(key));
+    }
+
+    return stringJoiner.toString();
+  }
+
+  public static String encode(String value)
+  {
+    String encoded = null;
+    try {
+      encoded = URLEncoder.encode(value, oauth_signature_encoding);
+    } catch (UnsupportedEncodingException ignore) {
+    }
+    StringBuilder buf = new StringBuilder(encoded.length());
+    char focus;
+    for (int i = 0; i < encoded.length(); i++) {
+      focus = encoded.charAt(i);
+      if (focus == '*') {
+        buf.append("%2A");
+      } else if (focus == '+') {
+        buf.append("%20");
+      } else if (focus == '%' && (i + 1) < encoded.length()
+          && encoded.charAt(i + 1) == '7' && encoded.charAt(i + 2) == 'E') {
+        buf.append('~');
+        i += 2;
+      } else {
+        buf.append(focus);
+      }
+    }
+    return buf.toString();
+  }
+
+  public static String computeSignature(String baseString, String keyString) throws GeneralSecurityException, UnsupportedEncodingException
+  {
+    SecretKey secretKey = null;
+
+    byte[] keyBytes = keyString.getBytes();
+    secretKey = new SecretKeySpec(keyBytes, "HmacSHA1");
+
+    Mac mac = Mac.getInstance("HmacSHA1");
+    mac.init(secretKey);
+
+    byte[] text = baseString.getBytes();
+
+    return new String(base64Encoder.encode(mac.doFinal(text))).trim();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
new file mode 100644
index 0000000..3f7a853
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/TwitterRetryHandler.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.util.api.requests.backoff.AbstractBackOffStrategy;
+import org.apache.streams.util.api.requests.backoff.BackOffException;
+import org.apache.streams.util.api.requests.backoff.impl.LinearTimeBackOffStrategy;
+
+import org.apache.http.client.HttpRequestRetryHandler;
+import org.apache.http.protocol.HttpContext;
+import org.apache.juneau.rest.client.RetryOn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  Handle expected and unexpected exceptions.
+ */
+public class TwitterRetryHandler implements RetryOn {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterRetryHandler.class);
+
+  private static AbstractBackOffStrategy backoff_strategy;
+
+// TODO: once request context is available, we can maintain multiple BackoffStrategy one per request path / params
+//  private static Map<String, AbstractBackOffStrategy> backoffs = new ConcurrentHashMap<>();
+
+// This is everything we used to check via twitter4j to decide whether to retry.
+//
+// @Deprecated
+//  public static int handleTwitterError(Twitter twitter, Exception exception) {
+//    return handleTwitterError( twitter, null, exception);
+//  }
+//
+//
+//  public static int handleTwitterError(Twitter twitter, Long id, Exception exception) {
+//
+//    if (exception instanceof TwitterException) {
+//      TwitterException twitterException = (TwitterException)exception;
+//
+//      if (twitterException.exceededRateLimitation()) {
+//
+//        long millisUntilReset = retry;
+//
+//        final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus();
+//        if (rateLimitStatus != null) {
+//          millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
+//        }
+//
+//        LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
+//
+//        try {
+//          Thread.sleep(millisUntilReset);
+//        } catch (InterruptedException e1) {
+//          Thread.currentThread().interrupt();
+//        }
+//
+//        return 1;
+//      } else if (twitterException.isCausedByNetworkIssue()) {
+//        LOGGER.info("Twitter Network Issues Detected. Backing off...");
+//        LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage());
+//        try {
+//          Thread.sleep(retry);
+//        } catch (InterruptedException e1) {
+//          Thread.currentThread().interrupt();
+//        }
+//        return 1;
+//      } else if (twitterException.isErrorMessageAvailable()) {
+//        if (twitterException.getMessage().toLowerCase().contains("does not exist")) {
+//          if ( id != null ) {
+//            LOGGER.warn("User does not exist: {}", id);
+//          } else {
+//            LOGGER.warn("User does not exist");
+//          }
+//          return (int)retryMax;
+//        } else {
+//          return (int)retryMax / 3;
+//        }
+//      } else {
+//        if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) {
+//          // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
+//          return (int)retryMax / 3;
+//        } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
+//          // This is a 401 reflecting credentials don't have access to the requested resource.
+//          if ( id != null ) {
+//            LOGGER.warn("Authentication Exception accessing id: {}", id);
+//          } else {
+//            LOGGER.warn("Authentication Exception");
+//          }
+//          return (int)retryMax;
+//        } else {
+//          LOGGER.warn("Unknown Twitter Exception...");
+//          LOGGER.warn("   Access: {}", twitterException.getAccessLevel());
+//          LOGGER.warn("     Code: {}", twitterException.getExceptionCode());
+//          LOGGER.warn("  Message: {}", twitterException.getLocalizedMessage());
+//          return (int)retryMax / 10;
+//        }
+//      }
+//    } else if (exception instanceof RuntimeException) {
+//      LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
+//      return (int)retryMax / 3;
+//    } else {
+//      LOGGER.info("Completely Unknown Exception: {}", exception);
+//      return (int)retryMax / 3;
+//    }
+//  }
+//  TODO: juneau 6.3.x-incubating
+//  @Override
+//  public boolean onCode(int httpResponseCode) {
+//
+//    LOGGER.warn("TwitterRetryHandler: {}", httpResponseCode);
+//
+//    if( httpResponseCode > 400 ) {
+//      return true;
+//    } else {
+//      return false;
+//    }
+//
+//  }
+
+  @Override
+  public boolean onCode(int httpResponseCode) {
+//    if( backoff_strategy == null ) {
+//      backoff_strategy = new LinearTimeBackOffStrategy(retrySleepMs / 1000, retryMax);
+//    }
+//    if( httpResponseCode > 400 ) {
+//      try {
+//        backoff_strategy.backOff();
+//        return true;
+//      } catch (BackOffException boe) {
+//        backoff_strategy.reset();
+//        return false;
+//      }
+//    } else {
+//      return false;
+//    }
+    if( httpResponseCode > 400 ) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
new file mode 100644
index 0000000..5de9046
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/api/Users.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.api;
+
+import org.apache.streams.twitter.pojo.Tweet;
+import org.apache.streams.twitter.pojo.User;
+
+import java.util.List;
+
+/**
+ * Returns a collection of the most recent Tweets posted by the user indicated by the screen_name or user_id parameters.
+ */
+public interface Users {
+
+  /**
+   * Returns fully-hydrated user objects for up to 100 users per request, as specified by comma-separated values passed to the user_id and/or screen_name parameters.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.UsersLookupRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/users/lookup">https://dev.twitter.com/rest/reference/get/users/lookup</a>
+   *
+   */
+  public List<User> lookup(UsersLookupRequest parameters);
+
+  /**
+   * Returns a variety of information about the user specified by the required user_id or screen_name parameter. The author\u2019s most recent Tweet will be returned inline when possible.
+   *
+   * @param parameters {@link org.apache.streams.twitter.api.UsersShowRequest}
+   * @return List<Tweet>
+   * @see <a href="https://dev.twitter.com/rest/reference/get/users/show">https://dev.twitter.com/rest/reference/get/users/show</a>
+   *
+   */
+  public User show(UsersShowRequest parameters);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
index 6392e0d..6db16be 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/converter/util/TwitterActivityUtil.java
@@ -126,7 +126,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link ActivityObject} actor from the tweet
+   * Builds the activity {@link ActivityObject} actor from the tweet.
    * @param tweet the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
@@ -138,7 +138,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Builds the activity {@link ActivityObject} actor from the User
+   * Builds the activity {@link ActivityObject} actor from the User.
    * @param user the object to use as the source
    * @return a valid Actor populated from the Tweet
    */
@@ -186,7 +186,7 @@ public class TwitterActivityUtil {
   }
 
   /**
-   * Creates an {@link ActivityObject} for the tweet
+   * Creates an {@link ActivityObject} for the tweet.
    * @param tweet the object to use as the source
    * @return a valid ActivityObject
    */

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
index 89a5abd..a170626 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/processor/FetchAndReplaceTwitterProcessor.java
@@ -28,23 +28,20 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.twitter.TwitterConfiguration;
 import org.apache.streams.twitter.TwitterStreamConfiguration;
+import org.apache.streams.twitter.api.StatusesShowRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDocumentClassifier;
 import org.apache.streams.twitter.pojo.Delete;
 import org.apache.streams.twitter.pojo.Retweet;
 import org.apache.streams.twitter.pojo.Tweet;
-import org.apache.streams.twitter.provider.TwitterProviderUtil;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
-import twitter4j.TwitterObjectFactory;
-import twitter4j.conf.ConfigurationBuilder;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Objects;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -89,7 +86,13 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
       Activity doc = (Activity)entry.getDocument();
       String originalId = doc.getId();
       if (PROVIDER_ID.equals(doc.getProvider().getId())) {
-        fetchAndReplace(doc, originalId);
+        try {
+          fetchAndReplace(doc, originalId);
+        } catch (ActivityConversionException ex) {
+          LOGGER.warn("ActivityConversionException", ex);
+        } catch (IOException ex) {
+          LOGGER.warn("IOException", ex);
+        }
       }
     } else {
       throw new IllegalStateException("Requires an activity document");
@@ -100,8 +103,14 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
 
   @Override
   public void prepare(Object configurationObject) {
-    this.client = getTwitterClient();
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+    Objects.requireNonNull(client);
     this.mapper = StreamsJacksonMapper.getInstance();
+    Objects.requireNonNull(mapper);
   }
 
   @Override
@@ -109,22 +118,14 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
 
   }
 
-  protected void fetchAndReplace(Activity doc, String originalId) {
-    try {
-      String json = fetch(doc);
-      replace(doc, json);
-      doc.setId(originalId);
-      retryCount = 0;
-    } catch (TwitterException tw) {
-      if (tw.exceededRateLimitation()) {
-        sleepAndTryAgain(doc, originalId);
-      }
-    } catch (Exception ex) {
-      LOGGER.warn("Error fetching and replacing tweet for activity {}", doc.getId());
-    }
+  protected void fetchAndReplace(Activity doc, String originalId) throws java.io.IOException, ActivityConversionException {
+    Tweet tweet = fetch(doc);
+    replace(doc, tweet);
+    doc.setId(originalId);
   }
 
-  protected void replace(Activity doc, String json) throws java.io.IOException, ActivityConversionException {
+  protected void replace(Activity doc, Tweet tweet) throws java.io.IOException, ActivityConversionException {
+    String json = mapper.writeValueAsString(tweet);
     Class documentSubType = new TwitterDocumentClassifier().detectClasses(json).get(0);
     Object object = mapper.readValue(json, documentSubType);
 
@@ -137,52 +138,38 @@ public class FetchAndReplaceTwitterProcessor implements StreamsProcessor {
     }
   }
 
-  protected String fetch(Activity doc) throws TwitterException {
+  protected Tweet fetch(Activity doc) {
     String id = doc.getObject().getId();
     LOGGER.debug("Fetching status from Twitter for {}", id);
     Long tweetId = Long.valueOf(id.replace("id:twitter:tweets:", ""));
-    Status status = getTwitterClient().showStatus(tweetId);
-    return TwitterObjectFactory.getRawJSON(status);
+    Tweet tweet = client.show(
+        new StatusesShowRequest()
+            .withId(tweetId)
+    );
+    return tweet;
   }
 
 
-  protected Twitter getTwitterClient() {
-
-    if (this.client == null) {
+  protected Twitter getTwitterClient() throws InstantiationException {
 
-      String baseUrl = TwitterProviderUtil.baseUrl(config);
+    return Twitter.getInstance(config);
 
-      ConfigurationBuilder builder = new ConfigurationBuilder()
-          .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-          .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-          .setOAuthAccessToken(config.getOauth().getAccessToken())
-          .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-          .setIncludeEntitiesEnabled(true)
-          .setJSONStoreEnabled(true)
-          .setAsyncNumThreads(1)
-          .setRestBaseURL(baseUrl)
-          .setIncludeMyRetweetEnabled(Boolean.TRUE)
-          .setPrettyDebugEnabled(Boolean.TRUE);
-
-      this.client = new TwitterFactory(builder.build()).getInstance();
-    }
-    return this.client;
   }
 
   //Hardcore sleep to allow for catch up
-  protected void sleepAndTryAgain(Activity doc, String originalId) {
-    try {
-      //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
-      if (retryCount < MAX_ATTEMPTS) {
-        retryCount++;
-        LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
-        Thread.sleep(BACKOFF * retryCount);
-        fetchAndReplace(doc, originalId);
-      } else {
-        retryCount = 0;
-      }
-    } catch (InterruptedException ex) {
-      LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
-    }
-  }
+//  protected void sleepAndTryAgain(Activity doc, String originalId) {
+//    try {
+//      //Attempt to fetchAndReplace with a backoff up to the limit then just reset the count and let the process continue
+//      if (retryCount < MAX_ATTEMPTS) {
+//        retryCount++;
+//        LOGGER.debug("Sleeping for {} min due to excessive calls to Twitter API", (retryCount * 4));
+//        Thread.sleep(BACKOFF * retryCount);
+//        fetchAndReplace(doc, originalId);
+//      } else {
+//        retryCount = 0;
+//      }
+//    } catch (InterruptedException ex) {
+//      LOGGER.warn("Thread sleep interrupted while waiting for twitter backoff");
+//    }
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
deleted file mode 100644
index ec43fba..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterErrorHandler.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.twitter.TwitterConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.RateLimitStatus;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-
-/**
- *  Handle expected and unexpected exceptions.
- */
-public class TwitterErrorHandler {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterErrorHandler.class);
-
-  // selected because 3 * 5 + n >= 15 for positive n
-  protected static long retry =
-      new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-          StreamsConfigurator.getConfig().getConfig("twitter")
-      ).getRetrySleepMs();
-  protected static long retryMax =
-      new ComponentConfigurator<TwitterConfiguration>(TwitterConfiguration.class).detectConfiguration(
-          StreamsConfigurator.getConfig().getConfig("twitter")
-      ).getRetryMax();
-
-  @Deprecated
-  public static int handleTwitterError(Twitter twitter, Exception exception) {
-    return handleTwitterError( twitter, null, exception);
-  }
-
-  /**
-   * handleTwitterError.
-   * @param twitter Twitter
-   * @param id id
-   * @param exception exception
-   * @return
-   */
-  public static int handleTwitterError(Twitter twitter, Long id, Exception exception) {
-
-    if (exception instanceof TwitterException) {
-      TwitterException twitterException = (TwitterException)exception;
-
-      if (twitterException.exceededRateLimitation()) {
-
-        long millisUntilReset = retry;
-
-        final RateLimitStatus rateLimitStatus = twitterException.getRateLimitStatus();
-        if (rateLimitStatus != null) {
-          millisUntilReset = rateLimitStatus.getSecondsUntilReset() * 1000;
-        }
-
-        LOGGER.warn("Rate Limit Exceeded. Will retry in {} seconds...", millisUntilReset / 1000);
-
-        try {
-          Thread.sleep(millisUntilReset);
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-
-        return 1;
-      } else if (twitterException.isCausedByNetworkIssue()) {
-        LOGGER.info("Twitter Network Issues Detected. Backing off...");
-        LOGGER.info("{} - {}", twitterException.getExceptionCode(), twitterException.getLocalizedMessage());
-        try {
-          Thread.sleep(retry);
-        } catch (InterruptedException e1) {
-          Thread.currentThread().interrupt();
-        }
-        return 1;
-      } else if (twitterException.isErrorMessageAvailable()) {
-        if (twitterException.getMessage().toLowerCase().contains("does not exist")) {
-          if ( id != null ) {
-            LOGGER.warn("User does not exist: {}", id);
-          } else {
-            LOGGER.warn("User does not exist");
-          }
-          return (int)retryMax;
-        } else {
-          return (int)retryMax / 3;
-        }
-      } else {
-        if (twitterException.getExceptionCode().equals("ced778ef-0c669ac0")) {
-          // This is a known weird issue, not exactly sure the cause, but you'll never be able to get the data.
-          return (int)retryMax / 3;
-        } else if (twitterException.getExceptionCode().equals("4be80492-0a7bf7c7")) {
-          // This is a 401 reflecting credentials don't have access to the requested resource.
-          if ( id != null ) {
-            LOGGER.warn("Authentication Exception accessing id: {}", id);
-          } else {
-            LOGGER.warn("Authentication Exception");
-          }
-          return (int)retryMax;
-        } else {
-          LOGGER.warn("Unknown Twitter Exception...");
-          LOGGER.warn("  Account: {}", twitter);
-          LOGGER.warn("   Access: {}", twitterException.getAccessLevel());
-          LOGGER.warn("     Code: {}", twitterException.getExceptionCode());
-          LOGGER.warn("  Message: {}", twitterException.getLocalizedMessage());
-          return (int)retryMax / 10;
-        }
-      }
-    } else if (exception instanceof RuntimeException) {
-      LOGGER.warn("TwitterGrabber: Unknown Runtime Error", exception.getMessage());
-      return (int)retryMax / 3;
-    } else {
-      LOGGER.info("Completely Unknown Exception: {}", exception);
-      return (int)retryMax / 3;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
new file mode 100644
index 0000000..b9448e6
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersIdsProviderTask.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersIdsRequest;
+import org.apache.streams.twitter.api.FollowersIdsResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFollowersIdsProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersIdsProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FollowersIdsRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FollowersIdsRequest
+   */
+  public TwitterFollowersIdsProviderTask(TwitterFollowingProvider provider, Twitter twitter, FollowersIdsRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFollowersIds(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFollowersIds(FollowersIdsRequest request) {
+
+    do {
+
+      FollowersIdsResponse response = client.ids(request);
+
+      last_count = response.getIds().size();
+
+      if (response.getIds().size() > 0) {
+
+        for (Long id : response.getIds()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()))
+              .withFollower(
+                  new User()
+                      .withId(id));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}


[2/3] incubator-streams git commit: STREAMS-496: Remove twitter4j dependency from streams-provider-twitter

Posted by sb...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
new file mode 100644
index 0000000..d6e30e4
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowersListProviderTask.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FollowersListRequest;
+import org.apache.streams.twitter.api.FollowersListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFollowersListProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowersListProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FollowersListRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowersListProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FollowersListRequest
+   */
+  public TwitterFollowersListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FollowersListRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFollowersList(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFollowersList(FollowersListRequest request) {
+
+    do {
+
+      FollowersListResponse response = client.list(request);
+
+      last_count = response.getUsers().size();
+
+      if (response.getUsers().size() > 0) {
+
+        for (User follower : response.getUsers()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()))
+              .withFollower(follower);
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
index 16b6c03..a2be967 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProvider.java
@@ -26,20 +26,26 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
+import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.FollowersIdsRequest;
+import org.apache.streams.twitter.api.FollowingIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigParseOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -49,13 +55,18 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Queue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Retrieve all follow adjacencies from a list of user ids or names.
  */
-public class TwitterFollowingProvider extends TwitterUserInformationProvider {
+public class TwitterFollowingProvider {
 
   public static final String STREAMS_ID = "TwitterFollowingProvider";
   private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProvider.class);
@@ -64,8 +75,19 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
   private TwitterFollowingConfiguration config;
 
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
+
+  protected Twitter client;
+
+  protected ListeningExecutorService executor;
+
   private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
+  protected final AtomicBoolean running = new AtomicBoolean();
+
+  protected volatile Queue<StreamsDatum> providerQueue;
+
   /**
    * To use from command line:
    *
@@ -139,67 +161,137 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
   }
 
   public TwitterFollowingProvider(TwitterFollowingConfiguration config) {
-    super(config);
     this.config = config;
   }
 
-  @Override
   public void prepare(Object configurationObject) {
-    super.prepare(config);
+
+    Objects.requireNonNull(config);
+    Objects.requireNonNull(config.getOauth());
+    Objects.requireNonNull(config.getOauth().getConsumerKey());
+    Objects.requireNonNull(config.getOauth().getConsumerSecret());
+    Objects.requireNonNull(config.getOauth().getAccessToken());
+    Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
+    Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
+
+    try {
+      lock.writeLock().lock();
+      providerQueue = constructQueue();
+    } finally {
+      lock.writeLock().unlock();
+    }
+
+    Objects.requireNonNull(providerQueue);
+
+    // abstract this out
+    for (String s : config.getInfo()) {
+      if (s != null) {
+        String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
+
+        // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+        // screen name list
+        try {
+          ids.add(Long.parseLong(potentialScreenName));
+        } catch (Exception ex) {
+          names.add(potentialScreenName);
+        }
+      }
+    }
+
     Objects.requireNonNull(getConfig().getEndpoint());
+
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
+
     Preconditions.checkArgument(getConfig().getEndpoint().equals("friends") || getConfig().getEndpoint().equals("followers"));
+
+    if( config.getEndpoint().equals("friends")) {
+      submitFriendsThreads(ids, names);
+    } else if( config.getEndpoint().equals("followers")) {
+      submitFollowersThreads(ids, names);
+    }
   }
 
-  @Override
   public void startStream() {
 
     Objects.requireNonNull(executor);
 
-    Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
     LOGGER.info("startStream");
 
     running.set(true);
 
-    while (idsBatches.hasNext()) {
-      submitFollowingThreads(idsBatches.next());
-    }
-    while (screenNameBatches.hasNext()) {
-      submitFollowingThreads(screenNameBatches.next());
-    }
-
     executor.shutdown();
 
   }
 
-  protected void submitFollowingThreads(Long[] ids) {
-    Twitter client = getTwitterClient();
+  protected void submitFollowersThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      TwitterFollowersIdsProviderTask providerTask =
+          new TwitterFollowersIdsProviderTask(
+              this,
+              client,
+              (FollowersIdsRequest)new FollowersIdsRequest().withId(id));
+
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
+
+    for (String name : names) {
+      TwitterFollowersIdsProviderTask providerTask =
+          new TwitterFollowersIdsProviderTask(
+              this,
+              client,
+              (FollowersIdsRequest)new FollowersIdsRequest().withScreenName(name));
 
-    for (int i = 0; i < ids.length; i++) {
-      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, ids[i]);
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", ids[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
   }
 
-  protected void submitFollowingThreads(String[] screenNames) {
-    Twitter client = getTwitterClient();
+  protected void submitFriendsThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      TwitterFriendsIdsProviderTask providerTask =
+          new TwitterFriendsIdsProviderTask(
+              this,
+              client,
+              (FriendsIdsRequest)new FriendsIdsRequest().withId(id));
+
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
+
+    for (String name : names) {
+      TwitterFriendsIdsProviderTask providerTask =
+          new TwitterFriendsIdsProviderTask(
+              this,
+              client,
+              (FriendsIdsRequest)new FriendsIdsRequest().withScreenName(name));
 
-    for (int i = 0; i < screenNames.length; i++) {
-      TwitterFollowingProviderTask providerTask = new TwitterFollowingProviderTask(this, client, screenNames[i]);
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", screenNames[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
+  }
 
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
   }
 
-  @Override
   public StreamsResultSet readCurrent() {
 
-    LOGGER.info("{}{} - readCurrent", idsBatches, screenNameBatches);
-
     StreamsResultSet result;
 
     try {
@@ -207,7 +299,7 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
       result = new StreamsResultSet(providerQueue);
       result.setCounter(new DatumStatusCounter());
       providerQueue = constructQueue();
-      LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -216,17 +308,45 @@ public class TwitterFollowingProvider extends TwitterUserInformationProvider {
 
   }
 
-  public boolean shouldContinuePulling(List<twitter4j.User> users) {
+  public boolean shouldContinuePulling(List<User> users) {
     return (users != null) && (users.size() == config.getPageSize());
   }
 
-  @Override
   public boolean isRunning() {
-    if (providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone()) {
-      LOGGER.info("Completed");
+    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+      LOGGER.info("All Threads Completed");
       running.set(false);
       LOGGER.info("Exiting");
     }
     return running.get();
   }
+
+  // abstract this out
+  protected Queue<StreamsDatum> constructQueue() {
+    return new LinkedBlockingQueue<>();
+  }
+
+  // abstract this out
+  void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          System.err.println("Pool did not terminate");
+        }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public void cleanUp() {
+    shutdownAndAwaitTermination(executor);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
deleted file mode 100644
index 313416a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFollowingProviderTask.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.twitter.provider;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.twitter.pojo.Follow;
-import org.apache.streams.twitter.pojo.User;
-import org.apache.streams.util.ComponentUtils;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import twitter4j.PagableResponseList;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterObjectFactory;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- *  Retrieve friend or follower connections for a single user id.
- */
-public class TwitterFollowingProviderTask implements Runnable {
-
-  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFollowingProviderTask.class);
-
-  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-  protected TwitterFollowingProvider provider;
-  private Twitter client;
-  protected Long id;
-  private String screenName;
-
-  private int count = 0;
-
-  /**
-   * TwitterFollowingProviderTask constructor.
-   * @param provider TwitterFollowingProvider
-   * @param twitter Twitter
-   * @param id numeric id
-   */
-  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, Long id) {
-    this.provider = provider;
-    this.client = twitter;
-    this.id = id;
-  }
-
-  /**
-   * TwitterFollowingProviderTask constructor.
-   * @param provider TwitterFollowingProvider
-   * @param twitter Twitter
-   * @param screenName screenName
-   */
-  public TwitterFollowingProviderTask(TwitterFollowingProvider provider, Twitter twitter, String screenName) {
-    this.provider = provider;
-    this.client = twitter;
-    this.screenName = screenName;
-  }
-
-  int page_count = 0;
-  int item_count = 0;
-
-  @Override
-  public void run() {
-
-    Preconditions.checkArgument(id != null || screenName != null);
-
-    if ( id != null ) {
-      getFollowing(id);
-    } else {
-      getFollowing(screenName);
-    }
-
-    LOGGER.info(id != null ? id.toString() : screenName + " Thread Finished");
-
-  }
-
-  private void getFollowing(Long id) {
-
-    Preconditions.checkArgument(
-        provider.getConfig().getEndpoint().equals("friends")
-        || provider.getConfig().getEndpoint().equals("followers")
-    );
-
-    if ( provider.getConfig().getIdsOnly() ) {
-      collectIds(id);
-    } else {
-      collectUsers(id);
-    }
-  }
-
-  private void getFollowing(String screenName) {
-
-    twitter4j.User user = null;
-    try {
-      user = client.users().showUser(screenName);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-    }
-    Objects.requireNonNull(user);
-    getFollowing(user.getId());
-  }
-
-  private void collectUsers(Long id) {
-    int keepTrying = 0;
-    List<twitter4j.User> list = null;
-    long curser = -1;
-
-    twitter4j.User user;
-    String userJson;
-    try {
-      user = client.users().showUser(id);
-      userJson = TwitterObjectFactory.getRawJSON(user);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-      return;
-    }
-
-    do {
-      try {
-
-        PagableResponseList<twitter4j.User> page = null;
-        if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          page = client.friendsFollowers().getFollowersList(id, curser, provider.getConfig().getPageSize().intValue());
-        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          page = client.friendsFollowers().getFriendsList(id, curser, provider.getConfig().getPageSize().intValue());
-        }
-
-        Objects.requireNonNull(list);
-        Preconditions.checkArgument(list.size() > 0);
-
-        for (twitter4j.User other : list) {
-
-          String otherJson = TwitterObjectFactory.getRawJSON(other);
-
-          try {
-            Follow follow = null;
-            if ( provider.getConfig().getEndpoint().equals("followers") ) {
-              follow = new Follow()
-                  .withFollowee(mapper.readValue(userJson, User.class))
-                  .withFollower(mapper.readValue(otherJson, User.class));
-            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-              follow = new Follow()
-                  .withFollowee(mapper.readValue(otherJson, User.class))
-                  .withFollower(mapper.readValue(userJson, User.class));
-            }
-
-            Objects.requireNonNull(follow);
-
-            if ( item_count < provider.getConfig().getMaxItems()) {
-              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              item_count++;
-            }
-
-          } catch (Exception ex) {
-            LOGGER.warn("Exception: {}", ex);
-          }
-        }
-        if ( !page.hasNext() ) {
-          break;
-        }
-        if ( page.getNextCursor() == 0 ) {
-          break;
-        }
-        curser = page.getNextCursor();
-        page_count++;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
-      }
-    }
-    while (provider.shouldContinuePulling(list) && curser != 0 && keepTrying < provider.getConfig().getRetryMax() && count < provider.getConfig().getMaxItems());
-  }
-
-  private void collectIds(Long id) {
-    int keepTrying = 0;
-
-    long curser = -1;
-
-    twitter4j.User user;
-    String userJson;
-    try {
-      user = client.users().showUser(id);
-      userJson = TwitterObjectFactory.getRawJSON(user);
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure looking up " + id);
-      return;
-    }
-
-    do {
-      try {
-        twitter4j.IDs ids = null;
-        if ( provider.getConfig().getEndpoint().equals("followers") ) {
-          ids = client.friendsFollowers().getFollowersIDs(id, curser, provider.getConfig().getMaxItems().intValue());
-        } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-          ids = client.friendsFollowers().getFriendsIDs(id, curser, provider.getConfig().getMaxItems().intValue());
-        }
-
-        Objects.requireNonNull(ids);
-        Preconditions.checkArgument(ids.getIDs().length > 0);
-
-        for (long otherId : ids.getIDs()) {
-
-          try {
-            Follow follow = null;
-            if ( provider.getConfig().getEndpoint().equals("followers") ) {
-              follow = new Follow()
-                  .withFollowee(new User().withId(id))
-                  .withFollower(new User().withId(otherId));
-            } else if ( provider.getConfig().getEndpoint().equals("friends") ) {
-              follow = new Follow()
-                  .withFollowee(new User().withId(otherId))
-                  .withFollower(new User().withId(id));
-            }
-
-            Objects.requireNonNull(follow);
-
-            if ( item_count < provider.getConfig().getMaxItems()) {
-              ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
-              item_count++;
-            }
-          } catch (Exception ex) {
-            LOGGER.warn("Exception: {}", ex);
-          }
-        }
-        if ( !ids.hasNext() ) {
-          break;
-        }
-        if ( ids.getNextCursor() == 0 ) {
-          break;
-        }
-        curser = ids.getNextCursor();
-        page_count++;
-      } catch (TwitterException twitterException) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, id, twitterException);
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, null, ex);
-      }
-    }
-    while (shouldContinuePulling() && curser != 0 && keepTrying < provider.getConfig().getRetryMax() );
-  }
-
-  public boolean shouldContinuePulling() {
-    return ( item_count < provider.getConfig().getMaxItems()
-              && page_count < provider.getConfig().getMaxPages());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
new file mode 100644
index 0000000..84fb789
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsIdsProviderTask.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsIdsRequest;
+import org.apache.streams.twitter.api.FriendsIdsResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsIdsProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsIdsProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FriendsIdsRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FriendsIdsRequest
+   */
+  public TwitterFriendsIdsProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsIdsRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    getFriendsIds(request);
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  private void getFriendsIds(FriendsIdsRequest request) {
+
+    do {
+
+      FriendsIdsResponse response = client.ids(request);
+
+      last_count = response.getIds().size();
+
+      if (response.getIds().size() > 0) {
+
+        for (Long id : response.getIds()) {
+
+          Follow follow = new Follow()
+              .withFollowee(
+                  new User()
+                      .withId(id))
+              .withFollower(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
new file mode 100644
index 0000000..570705d
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterFriendsListProviderTask.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.FriendsListRequest;
+import org.apache.streams.twitter.api.FriendsListResponse;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.pojo.Follow;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  Retrieve friend or follower connections for a single user id.
+ */
+public class TwitterFriendsListProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterFriendsListProviderTask.class);
+
+  private static final ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  protected Twitter client;
+  protected TwitterFollowingProvider provider;
+  protected FriendsListRequest request;
+
+  private int count = 0;
+
+  /**
+   * TwitterFollowingProviderTask constructor.
+   * @param provider TwitterFollowingProvider
+   * @param twitter Twitter
+   * @param request FriendsListRequest
+   */
+  public TwitterFriendsListProviderTask(TwitterFollowingProvider provider, Twitter twitter, FriendsListRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  int last_count = 0;
+  int page_count = 1;
+  int item_count = 0;
+  long cursor = 0;
+
+  @Override
+  public void run() {
+
+    Preconditions.checkArgument(request.getId() != null || request.getScreenName() != null);
+
+    LOGGER.info(request.getId() != null ? request.getId().toString() : request.getScreenName() + " Thread Finished");
+
+  }
+
+  private void getFriendsList(FriendsListRequest request) {
+
+    do {
+
+      FriendsListResponse response = client.list(request);
+
+      last_count = response.getUsers().size();
+
+      if (response.getUsers().size() > 0) {
+
+        for (User friend : response.getUsers()) {
+
+          Follow follow = new Follow()
+              .withFollower(friend)
+              .withFollowee(
+                  new User()
+                      .withId(request.getId())
+                      .withScreenName(request.getScreenName()));
+
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(follow), provider.providerQueue);
+            item_count++;
+          }
+
+        }
+
+      }
+      page_count++;
+      cursor = response.getNextCursor();
+      request.setCursor(cursor);
+
+    }
+    while (shouldContinuePulling(cursor, last_count, page_count, item_count));
+  }
+
+  public boolean shouldContinuePulling(long cursor, int count, int page_count, int item_count) {
+    return (
+        cursor > 0
+            && count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
index 9f76fed..217b3d8 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProvider.java
@@ -27,7 +27,11 @@ import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterTimelineProviderConfiguration;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -44,12 +48,6 @@ import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterException;
-import twitter4j.TwitterFactory;
-import twitter4j.User;
-import twitter4j.conf.ConfigurationBuilder;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -92,8 +90,8 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     return config;
   }
 
-  protected Collection<String[]> screenNameBatches;
-  protected Collection<Long> ids;
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
 
   protected volatile Queue<StreamsDatum> providerQueue;
 
@@ -109,9 +107,6 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
   private List<ListenableFuture<Object>> futures = new ArrayList<>();
 
-  Boolean jsonStoreEnabled;
-  Boolean includeEntitiesEnabled;
-
   /**
    * To use from command line:
    *
@@ -207,14 +202,34 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     Objects.requireNonNull(config.getOauth().getAccessToken());
     Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
     Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
 
-    consolidateToIDs();
+    for (String s : config.getInfo()) {
+      if (s != null) {
+        String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
 
-    if (ids.size() > 1) {
-      executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(5, ids.size()));
-    } else {
-      executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+        // See if it is a long, if it is, add it to the user iD list, if it is not, add it to the
+        // screen name list
+        try {
+          ids.add(Long.parseLong(potentialScreenName));
+        } catch (Exception ex) {
+          names.add(potentialScreenName);
+        }
+      }
     }
+
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), config.getInfo().size()));
+
+    submitTimelineThreads(ids, names);
+
   }
 
   @Override
@@ -222,42 +237,40 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
 
     LOGGER.debug("{} startStream", STREAMS_ID);
 
-    Preconditions.checkArgument(!ids.isEmpty());
-
     running.set(true);
 
-    submitTimelineThreads(ids.toArray(new Long[0]));
-
     executor.shutdown();
 
   }
 
-  protected void submitTimelineThreads(Long[] ids) {
-
-    Twitter client = getTwitterClient();
-
-    for (int i = 0; i < ids.length; i++) {
-
-      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(this, client, ids[i]);
+  protected void submitTimelineThreads(List<Long> ids, List<String> names) {
+
+    for (Long id : ids) {
+      StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+      request.setUserId(id);
+      request.setCount(config.getPageSize());
+      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+          this,
+          client,
+          request
+      );
       ListenableFuture future = executor.submit(providerTask);
       futures.add(future);
-      LOGGER.info("submitted {}", ids[i]);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
-
-  }
-
-  private Collection<Long> retrieveIds(String[] screenNames) {
-    Twitter client = getTwitterClient();
-
-    List<Long> ids = new ArrayList<>();
-    try {
-      for (User twitterUser : client.lookupUsers(screenNames)) {
-        ids.add(twitterUser.getId());
-      }
-    } catch (TwitterException ex) {
-      LOGGER.error("Failure retrieving user details.", ex.getMessage());
+    for (String name : names) {
+      StatusesUserTimelineRequest request = new StatusesUserTimelineRequest();
+      request.setScreenName(name);
+      request.setCount(config.getPageSize());
+      TwitterTimelineProviderTask providerTask = new TwitterTimelineProviderTask(
+          this,
+          client,
+          request
+      );
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
-    return ids;
   }
 
   @Override
@@ -302,66 +315,14 @@ public class TwitterTimelineProvider implements StreamsProvider, Serializable {
     throw new NotImplementedException();
   }
 
-
-
-  /**
-   * Using the "info" list that is contained in the configuration, ensure that all
-   * account identifiers are converted to IDs (Longs) instead of screenNames (Strings).
-   */
-  protected void consolidateToIDs() {
-    List<String> screenNames = new ArrayList<>();
-    ids = new ArrayList<>();
-
-    for ( String account : config.getInfo() ) {
-      try {
-        if ( new Long(account) != null ) {
-          ids.add(Long.parseLong(Objects.toString(account, null)));
-        }
-      } catch ( NumberFormatException ex ) {
-        screenNames.add(account);
-      } catch ( Exception ex ) {
-        LOGGER.error("Exception while trying to add ID: {{}}, {}", account, ex);
-      }
-    }
-
-    // Twitter allows for batches up to 100 per request, but you cannot mix types
-    screenNameBatches = new ArrayList<>();
-    while ( screenNames.size() >= 100 ) {
-      screenNameBatches.add(screenNames.subList(0, 100).toArray(new String[0]));
-      screenNames = screenNames.subList(100, screenNames.size());
-    }
-
-    if (screenNames.size() > 0) {
-      screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-    }
-
-    for ( String[] screenNameBatche : screenNameBatches ) {
-      Collection<Long> batchIds = retrieveIds(screenNameBatche);
-      ids.addAll(batchIds);
-    }
-  }
-
   /**
    * get Twitter Client from TwitterUserInformationConfiguration.
    * @return result
    */
-  public Twitter getTwitterClient() {
-
-    String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-    ConfigurationBuilder builder = new ConfigurationBuilder()
-        .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-        .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-        .setOAuthAccessToken(config.getOauth().getAccessToken())
-        .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-        .setIncludeEntitiesEnabled(true)
-        .setJSONStoreEnabled(true)
-        .setAsyncNumThreads(3)
-        .setRestBaseURL(baseUrl)
-        .setIncludeMyRetweetEnabled(Boolean.TRUE)
-        .setPrettyDebugEnabled(Boolean.TRUE);
-
-    return new TwitterFactory(builder.build()).getInstance();
+  public Twitter getTwitterClient() throws InstantiationException {
+
+    return Twitter.getInstance(config);
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
index 8cb2b46..ffb90b7 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterTimelineProviderTask.java
@@ -20,17 +20,15 @@ package org.apache.streams.twitter.provider;
 
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.StatusesUserTimelineRequest;
+import org.apache.streams.twitter.api.Twitter;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.Tweet;
 import org.apache.streams.util.ComponentUtils;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Paging;
-import twitter4j.ResponseList;
-import twitter4j.Status;
-import twitter4j.Twitter;
-import twitter4j.TwitterObjectFactory;
 
 import java.util.List;
 import java.util.stream.Collectors;
@@ -47,81 +45,64 @@ public class TwitterTimelineProviderTask implements Runnable {
 
   protected TwitterTimelineProvider provider;
   protected Twitter client;
-  protected Long id;
+  protected StatusesUserTimelineRequest request;
 
   /**
    * TwitterTimelineProviderTask constructor.
    * @param provider TwitterTimelineProvider
    * @param twitter Twitter
-   * @param id Long
+   * @param request StatusesUserTimelineRequest
    */
-  public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, Long id) {
+  public TwitterTimelineProviderTask(TwitterTimelineProvider provider, Twitter twitter, StatusesUserTimelineRequest request) {
     this.provider = provider;
     this.client = twitter;
-    this.id = id;
+    this.request = request;
   }
 
-  int page_count = 1;
   int item_count = 0;
-  List<Status> lastPage = null;
+  int last_count = 0;
+  int page_count = 1;
 
   @Override
   public void run() {
 
-    Paging paging = new Paging(page_count, provider.getConfig().getPageSize().intValue());
-
-    LOGGER.info(id + " Thread Starting");
+    LOGGER.info("Thread Starting: {}", request.toString());
 
     do {
-      int keepTrying = 0;
-
-      // keep trying to load, give it 5 attempts.
-      //This value was chosen because it seemed like a reasonable number of times
-      //to retry capturing a timeline given the sorts of errors that could potentially
-      //occur (network timeout/interruption, faulty client, etc.)
-      while (keepTrying < 5) {
-
-        try {
-          this.client = provider.getTwitterClient();
 
-          ResponseList<Status> statuses = client.getUserTimeline(id, paging);
+      List<Tweet> statuses = client.userTimeline(request);
 
-          for (Status twitterStatus : statuses) {
+      last_count = statuses.size();
+      if( statuses.size() > 0 ) {
 
-            String json = TwitterObjectFactory.getRawJSON(twitterStatus);
-
-            if ( item_count < provider.getConfig().getMaxItems() ) {
-              try {
-                org.apache.streams.twitter.pojo.Tweet tweet = MAPPER.readValue(json, org.apache.streams.twitter.pojo.Tweet.class);
-                ComponentUtils.offerUntilSuccess(new StreamsDatum(tweet), provider.providerQueue);
-              } catch (Exception exception) {
-                LOGGER.warn("Failed to read document as Tweet ", twitterStatus);
-              }
-              item_count++;
-            }
+        for (Tweet status : statuses) {
 
+          if (item_count < provider.getConfig().getMaxItems()) {
+            ComponentUtils.offerUntilSuccess(new StreamsDatum(status), provider.providerQueue);
+            item_count++;
           }
 
-          lastPage = statuses;
-          page_count = paging.getPage() + 1;
-          paging.setPage(page_count);
-
-          keepTrying = 10;
-        } catch (Exception ex) {
-          keepTrying += TwitterErrorHandler.handleTwitterError(client, id, ex);
         }
+
+        Stream<Long> statusIds = statuses.stream().map(status -> status.getId());
+        long minId = statusIds.reduce(Math::min).get();
+        page_count++;
+        request.setMaxId(minId);
+
       }
+
     }
-    while (shouldContinuePulling());
+    while (shouldContinuePulling(last_count, page_count, item_count));
 
-    LOGGER.info(id + " Thread Finished");
+    LOGGER.info("Thread Finished: {}", request.toString());
 
   }
 
-  public boolean shouldContinuePulling() {
-    return (lastPage != null)
-        && item_count < provider.getConfig().getMaxItems()
-        && page_count <= provider.getConfig().getMaxPages();
+  public boolean shouldContinuePulling(int count, int page_count, int item_count) {
+    return (
+        count > 0
+            && item_count < provider.getConfig().getMaxItems()
+            && page_count <= provider.getConfig().getMaxPages());
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
index 214d204..1a7b906 100644
--- a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProvider.java
@@ -28,6 +28,8 @@ import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.twitter.TwitterFollowingConfiguration;
 import org.apache.streams.twitter.TwitterUserInformationConfiguration;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
 import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
 import org.apache.streams.twitter.pojo.User;
 import org.apache.streams.util.ComponentUtils;
@@ -35,6 +37,8 @@ import org.apache.streams.util.ComponentUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.Uninterruptibles;
@@ -45,10 +49,6 @@ import org.apache.commons.lang.NotImplementedException;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import twitter4j.Twitter;
-import twitter4j.TwitterFactory;
-import twitter4j.conf.ConfigurationBuilder;
-import twitter4j.json.DataObjectFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -88,6 +88,32 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
   private TwitterUserInformationConfiguration config;
 
+  protected List<String> names = new ArrayList<>();
+  protected List<Long> ids = new ArrayList<>();
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  protected volatile Queue<StreamsDatum> providerQueue;
+
+  public TwitterUserInformationConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(TwitterUserInformationConfiguration config) {
+    this.config = config;
+  }
+
+  protected Twitter client;
+
+  protected ListeningExecutorService executor;
+
+  protected DateTime start;
+  protected DateTime end;
+
+  protected final AtomicBoolean running = new AtomicBoolean();
+
+  private List<ListenableFuture<Object>> futures = new ArrayList<>();
+
   /**
    * To use from command line:
    *
@@ -148,28 +174,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     outStream.flush();
   }
 
-  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
-
-  protected volatile Queue<StreamsDatum> providerQueue;
-
-  public TwitterUserInformationConfiguration getConfig() {
-    return config;
-  }
-
-  public void setConfig(TwitterUserInformationConfiguration config) {
-    this.config = config;
-  }
-
-  protected Iterator<Long[]> idsBatches;
-  protected Iterator<String[]> screenNameBatches;
-
-  protected ListeningExecutorService executor;
-
-  protected DateTime start;
-  protected DateTime end;
-
-  protected final AtomicBoolean running = new AtomicBoolean();
-
   // TODO: this should be abstracted out
   public static ExecutorService newFixedThreadPoolWithQueueSize(int numThreads, int queueSize) {
     return new ThreadPoolExecutor(numThreads, numThreads,
@@ -212,6 +216,15 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     Objects.requireNonNull(config.getOauth().getAccessToken());
     Objects.requireNonNull(config.getOauth().getAccessTokenSecret());
     Objects.requireNonNull(config.getInfo());
+    Objects.requireNonNull(config.getThreadsPerProvider());
+
+    try {
+      client = getTwitterClient();
+    } catch (InstantiationException e) {
+      LOGGER.error("InstantiationException", e);
+    }
+
+    Objects.requireNonNull(client);
 
     try {
       lock.writeLock().lock();
@@ -222,12 +235,6 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     Objects.requireNonNull(providerQueue);
 
-    List<String> screenNames = new ArrayList<>();
-    List<String[]> screenNameBatches = new ArrayList<>();
-
-    List<Long> ids = new ArrayList<>();
-    List<Long[]> idsBatches = new ArrayList<>();
-
     for (String s : config.getInfo()) {
       if (s != null) {
         String potentialScreenName = s.replaceAll("@", "").trim().toLowerCase();
@@ -237,46 +244,67 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
         try {
           ids.add(Long.parseLong(potentialScreenName));
         } catch (Exception ex) {
-          screenNames.add(potentialScreenName);
-        }
-
-        // Twitter allows for batches up to 100 per request, but you cannot mix types
-
-        if (ids.size() >= 100) {
-          // add the batch
-          idsBatches.add(ids.toArray(new Long[ids.size()]));
-          // reset the Ids
-          ids = new ArrayList<>();
-        }
-
-        if (screenNames.size() >= 100) {
-          // add the batch
-          screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-          // reset the Ids
-          screenNames = new ArrayList<>();
+          names.add(potentialScreenName);
         }
       }
     }
 
+    executor = MoreExecutors.listeningDecorator(TwitterUserInformationProvider.newFixedThreadPoolWithQueueSize(config.getThreadsPerProvider().intValue(), ids.size()));
 
-    if (ids.size() > 0) {
-      idsBatches.add(ids.toArray(new Long[ids.size()]));
-    }
+    Objects.requireNonNull(executor);
 
-    if (screenNames.size() > 0) {
-      screenNameBatches.add(screenNames.toArray(new String[ids.size()]));
-    }
+    // Twitter allows for batches up to 100 per request, but you cannot mix types
+    submitUserInformationThreads(ids, names);
+  }
 
-    if (ids.size() + screenNames.size() > 0) {
-      executor = MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, (ids.size() + screenNames.size())));
-    } else {
-      executor = MoreExecutors.listeningDecorator(newSingleThreadExecutor());
+  protected void submitUserInformationThreads(List<Long> ids, List<String> names) {
+
+    int idsIndex = 0;
+    while( idsIndex + 100 < ids.size() ) {
+      List<Long> batchIds = ids.subList(idsIndex, idsIndex + 100);
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchIds));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      idsIndex += 100;
+    }
+    if (ids.size() >= idsIndex) {
+      List<Long> batchIds = ids.subList(idsIndex, ids.size());
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchIds));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
     }
 
-    Objects.requireNonNull(executor);
+    int namesIndex = 0;
+    while( idsIndex + 100 < ids.size() ) {
+      List<String> batchNames = names.subList(namesIndex, namesIndex + 100);
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withScreenName(batchNames));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+      namesIndex += 100;
+    }
+    if (names.size() >= idsIndex) {
+      List<Long> batchNames = ids.subList(idsIndex, names.size());
+      TwitterUserInformationProviderTask providerTask = new TwitterUserInformationProviderTask(
+          this,
+          client,
+          new UsersLookupRequest().withUserId(batchNames));
+      ListenableFuture future = executor.submit(providerTask);
+      futures.add(future);
+      LOGGER.info("Thread Submitted: {}", providerTask.request);
+    }
 
-    this.idsBatches = idsBatches.iterator();
-    this.screenNameBatches = screenNameBatches.iterator();
   }
 
   @Override
@@ -284,82 +312,16 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
 
     Objects.requireNonNull(executor);
 
-    Preconditions.checkArgument(idsBatches.hasNext() || screenNameBatches.hasNext());
-
-    LOGGER.info("{}{} - startStream", idsBatches, screenNameBatches);
-
-    while (idsBatches.hasNext()) {
-      loadBatch(idsBatches.next());
-    }
-
-    while (screenNameBatches.hasNext()) {
-      loadBatch(screenNameBatches.next());
-    }
+    LOGGER.info("startStream: {} Threads", futures.size());
 
     running.set(true);
 
     executor.shutdown();
   }
 
-  protected void loadBatch(Long[] ids) {
-    Twitter client = getTwitterClient();
-    int keepTrying = 0;
-
-    // keep trying to load, give it 5 attempts.
-    //while (keepTrying < 10)
-    while (keepTrying < 1) {
-      try {
-        long[] toQuery = new long[ids.length];
-
-        for (int i = 0; i < ids.length; i++) {
-          toQuery[i] = ids[i];
-        }
-
-        for (twitter4j.User twitterUser : client.lookupUsers(toQuery)) {
-          String json = DataObjectFactory.getRawJSON(twitterUser);
-          try {
-            User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
-          } catch (Exception exception) {
-            LOGGER.warn("Failed to read document as User ", twitterUser);
-          }
-        }
-        keepTrying = 10;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
-      }
-    }
-  }
-
-  protected void loadBatch(String[] ids) {
-    Twitter client = getTwitterClient();
-    int keepTrying = 0;
-
-    // keep trying to load, give it 5 attempts.
-    //while (keepTrying < 10)
-    while (keepTrying < 1) {
-      try {
-        for (twitter4j.User twitterUser : client.lookupUsers(ids)) {
-          String json = DataObjectFactory.getRawJSON(twitterUser);
-          try {
-            User user = MAPPER.readValue(json, org.apache.streams.twitter.pojo.User.class);
-            ComponentUtils.offerUntilSuccess(new StreamsDatum(user), providerQueue);
-          } catch (Exception exception) {
-            LOGGER.warn("Failed to read document as User ", twitterUser);
-          }
-        }
-        keepTrying = 10;
-      } catch (Exception ex) {
-        keepTrying += TwitterErrorHandler.handleTwitterError(client, ex);
-      }
-    }
-  }
-
   @Override
   public StreamsResultSet readCurrent() {
 
-    LOGGER.debug("{}{} - readCurrent", idsBatches, screenNameBatches);
-
     StreamsResultSet result;
 
     try {
@@ -367,7 +329,7 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
       result = new StreamsResultSet(providerQueue);
       result.setCounter(new DatumStatusCounter());
       providerQueue = constructQueue();
-      LOGGER.debug("{}{} - providing {} docs", idsBatches, screenNameBatches, result.size());
+      LOGGER.debug("readCurrent: {} Documents", result.size());
     } finally {
       lock.writeLock().unlock();
     }
@@ -394,17 +356,14 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     return (StreamsResultSet)providerQueue.iterator();
   }
 
+
   @Override
   public boolean isRunning() {
-
-    if ( providerQueue.isEmpty() && executor.isTerminated() ) {
-      LOGGER.info("{}{} - completed", idsBatches, screenNameBatches);
-
+    if ( providerQueue.isEmpty() && executor.isTerminated() && Futures.allAsList(futures).isDone() ) {
+      LOGGER.info("All Threads Completed");
       running.set(false);
-
       LOGGER.info("Exiting");
     }
-
     return running.get();
   }
 
@@ -427,29 +386,8 @@ public class TwitterUserInformationProvider implements StreamsProvider, Serializ
     }
   }
 
-
-  // TODO: abstract out, also appears in TwitterTimelineProvider
-  protected Twitter getTwitterClient() {
-    String baseUrl = TwitterProviderUtil.baseUrl(config);
-
-    ConfigurationBuilder builder = new ConfigurationBuilder()
-        .setOAuthConsumerKey(config.getOauth().getConsumerKey())
-        .setOAuthConsumerSecret(config.getOauth().getConsumerSecret())
-        .setOAuthAccessToken(config.getOauth().getAccessToken())
-        .setOAuthAccessTokenSecret(config.getOauth().getAccessTokenSecret())
-        .setIncludeEntitiesEnabled(true)
-        .setJSONStoreEnabled(true)
-        .setAsyncNumThreads(3)
-        .setRestBaseURL(baseUrl)
-        .setIncludeMyRetweetEnabled(Boolean.TRUE)
-        .setPrettyDebugEnabled(Boolean.TRUE);
-
-    return new TwitterFactory(builder.build()).getInstance();
-  }
-
-  protected void callback() {
-
-
+  protected Twitter getTwitterClient() throws InstantiationException {
+    return Twitter.getInstance(config);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
new file mode 100644
index 0000000..5dbb784
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/java/org/apache/streams/twitter/provider/TwitterUserInformationProviderTask.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.twitter.provider;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.twitter.api.Twitter;
+import org.apache.streams.twitter.api.UsersLookupRequest;
+import org.apache.streams.twitter.converter.TwitterDateTimeFormat;
+import org.apache.streams.twitter.pojo.User;
+import org.apache.streams.util.ComponentUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ *  Retrieve recent posts for a single user id.
+ */
+public class TwitterUserInformationProviderTask implements Runnable {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(TwitterUserInformationProviderTask.class);
+
+  private static ObjectMapper MAPPER = new StreamsJacksonMapper(Stream.of(TwitterDateTimeFormat.TWITTER_FORMAT).collect(Collectors.toList()));
+
+  protected TwitterUserInformationProvider provider;
+  protected Twitter client;
+  protected UsersLookupRequest request;
+
+  /**
+   * TwitterTimelineProviderTask constructor.
+   * @param provider TwitterUserInformationProvider
+   * @param twitter Twitter
+   * @param request UsersLookupRequest
+   */
+  public TwitterUserInformationProviderTask(TwitterUserInformationProvider provider, Twitter twitter, UsersLookupRequest request) {
+    this.provider = provider;
+    this.client = twitter;
+    this.request = request;
+  }
+
+  @Override
+  public void run() {
+
+    LOGGER.info("Thread Starting: {}", request.toString());
+
+    List<User> users = client.lookup(request);
+
+    for (User user : users) {
+      ComponentUtils.offerUntilSuccess(new StreamsDatum(user), provider.providerQueue);
+    }
+
+    LOGGER.info("Thread Finished: {}", request.toString());
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
deleted file mode 100644
index 69048d1..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterConfiguration.json
+++ /dev/null
@@ -1,87 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterConfiguration",
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "protocol": {
-            "type": "string",
-            "description": "The protocol",
-            "default": "https"
-        },
-        "host": {
-            "type": "string",
-            "description": "The host",
-            "default": "api.twitter.com"
-        },
-        "port": {
-            "type": "integer",
-            "description": "The port",
-            "default": 443
-        },
-        "version": {
-            "type": "string",
-            "description": "The version",
-            "default": "1.1"
-        },
-        "endpoint": {
-            "type": "string",
-            "description": "The endpoint"
-        },
-        "jsonStoreEnabled": {
-            "default" : true,
-            "type": "string"
-        },
-        "oauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterOAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "appName": {
-                    "type": "string"
-                },
-                "consumerKey": {
-                    "type": "string"
-                },
-                "consumerSecret": {
-                    "type": "string"
-                },
-                "accessToken": {
-                    "type": "string"
-                },
-                "accessTokenSecret": {
-                    "type": "string"
-                }
-            }
-        },
-        "basicauth": {
-            "type": "object",
-            "dynamic": "true",
-            "javaType" : "org.apache.streams.twitter.TwitterBasicAuthConfiguration",
-            "javaInterfaces": ["java.io.Serializable"],
-            "properties": {
-                "username": {
-                    "type": "string"
-                },
-                "password": {
-                    "type": "string"
-                }
-            }
-        },
-        "retrySleepMs": {
-             "type": "integer",
-             "description": "ms to sleep when hitting a rate limit",
-             "default": 100000
-         },
-         "retryMax": {
-             "type": "integer",
-             "description": "ms to sleep when hitting a rate limit",
-             "default": 10
-        }
-   }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
deleted file mode 100644
index 89fc7af..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterFollowingConfiguration.json
+++ /dev/null
@@ -1,33 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterFollowingConfiguration",
-    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "ids_only": {
-            "type": "boolean",
-            "description": "Whether to collect ids only, or full profiles",
-            "default": "true"
-        },
-        "max_items": {
-            "type": "integer",
-            "description": "Max items per user to collect",
-            "default": 50000
-        },
-        "max_pages": {
-            "type": "integer",
-            "description": "Max pages per user to request",
-            "default": 10
-        },
-        "page_size": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 5000
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
deleted file mode 100644
index 6fa2a73..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterStreamConfiguration.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterStreamConfiguration",
-    "extends": {"$ref":"TwitterConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "includeEntities": {
-            "type": "string"
-        },
-        "truncated": {
-            "type": "boolean"
-        },
-        "filter-level": {
-            "type": "string",
-            "description": "Setting this parameter to one of none, low, or medium will set the minimum value of the filter_level Tweet attribute required to be included in the stream"
-        },
-        "with": {
-            "type": "string",
-            "description": "Typically following or user"
-        },
-        "replies": {
-            "type": "string",
-            "description": "Set to all, to see all @replies"
-        },
-        "follow": {
-            "type": "array",
-            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
-            "items": {
-                "type": "integer"
-            }
-        },
-        "track": {
-            "type": "array",
-            "description": "A list of phrases which will be used to determine what Tweets will be delivered on the stream",
-            "items": {
-                "type": "string"
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
deleted file mode 100644
index 37ed60e..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterTimelineProviderConfiguration.json
+++ /dev/null
@@ -1,23 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterTimelineProviderConfiguration",
-    "extends": {"$ref":"TwitterUserInformationConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "max_items": {
-            "type": "integer",
-            "description": "Max items per user to collect",
-            "default": 3200
-        },
-        "max_pages": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 16
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
deleted file mode 100644
index 405c87a..0000000
--- a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/TwitterUserInformationConfiguration.json
+++ /dev/null
@@ -1,25 +0,0 @@
-{
-    "$schema": "http://json-schema.org/draft-03/schema",
-    "$license": [
-        "http://www.apache.org/licenses/LICENSE-2.0"
-    ],
-    "id": "#",
-    "type": "object",
-    "javaType" : "org.apache.streams.twitter.TwitterUserInformationConfiguration",
-    "extends": {"$ref":"TwitterConfiguration.json"},
-    "javaInterfaces": ["java.io.Serializable"],
-    "properties": {
-        "info": {
-            "type": "array",
-            "description": "A list of user IDs, indicating the users whose Tweets should be delivered on the stream",
-            "items": {
-                "type": "string"
-            }
-        },
-        "page_size": {
-            "type": "integer",
-            "description": "Max items per page to request",
-            "default": 200
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
new file mode 100644
index 0000000..2b98689
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FollowersIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+  "extends": { "$ref": "FollowingIdsRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
new file mode 100644
index 0000000..03dd83c
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersIdsResponse.json
@@ -0,0 +1,32 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FollowersIdsResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/ids",
+  "properties": {
+    "ids": {
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
new file mode 100644
index 0000000..c588579
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FollowersListRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+  "extends": { "$ref": "FollowingListRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
new file mode 100644
index 0000000..e8e46a1
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowersListResponse.json
@@ -0,0 +1,33 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FollowersListResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/followers/list",
+  "properties": {
+    "users": {
+      "type": "array",
+      "items": {
+        "type": "object",
+        "$ref": "../pojo/User.json"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
new file mode 100644
index 0000000..81804fa
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingIdsRequest.json
@@ -0,0 +1,36 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.FollowingIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "id": {
+      "description": "The ID of the user for whom to return results for.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results for.",
+      "required": false,
+      "type": "string"
+    },
+    "cursor": {
+      "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+      "required": false,
+      "type": "integer"
+    },
+    "stringify_ids": {
+      "description": "Many programming environments will not consume our Tweet ids due to their size. Provide this option to have ids returned as strings instead.",
+      "required": false,
+      "type": "boolean"
+    },
+    "count": {
+      "description": "Specifies the number of IDs attempt retrieval of, up to a maximum of 5,000 per distinct request. The value of count is best thought of as a limit to the number of results to return. When using the count parameter with this method, it is wise to use a consistent count value across all requests to the same user\u2019s collection. Usage of this parameter is encouraged in environments where all 5,000 IDs constitutes too large of a response.",
+      "required": false,
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
new file mode 100644
index 0000000..90295b5
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FollowingListRequest.json
@@ -0,0 +1,41 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType" : "org.apache.streams.twitter.api.FollowingListRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "properties": {
+    "id": {
+      "description": "The ID of the user for whom to return results for.",
+      "required": false,
+      "type": "integer"
+    },
+    "screen_name": {
+      "description": "The screen name of the user for whom to return results for.",
+      "required": false,
+      "type": "string"
+    },
+    "cursor": {
+      "description": "Causes the list of connections to be broken into pages of no more than 5000 IDs at a time. The number of IDs returned is not guaranteed to be 5000 as suspended users are filtered out after connections are queried. If no cursor is provided, a value of -1 will be assumed, which is the first page.\nThe response from the API will include a previous_cursor and next_cursor to allow paging back and forth.",
+      "required": false,
+      "type": "integer"
+    },
+    "count": {
+      "description": "The number of users to return per page, up to a maximum of 200. Defaults to 20.",
+      "required": false,
+      "type": "integer"
+    },
+    "skip_status": {
+      "description": "When set to either true , t or 1 , statuses will not be included in the returned user objects. If set to any other value, statuses will be included.",
+      "required": false,
+      "type": "boolean"
+    },
+    "include_user_entities": {
+      "description": "The user object entities node will not be included when set to false.",
+      "required": false,
+      "type": "integer"
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
new file mode 100644
index 0000000..fda54f8
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsRequest.json
@@ -0,0 +1,13 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "type": "object",
+  "javaType" : "org.apache.streams.twitter.api.FriendsIdsRequest",
+  "javaInterfaces": ["java.io.Serializable"],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+  "extends": { "$ref": "FollowingIdsRequest.json" },
+  "properties": {}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/67497a48/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
new file mode 100644
index 0000000..fc46841
--- /dev/null
+++ b/streams-contrib/streams-provider-twitter/src/main/jsonschema/org/apache/streams/twitter/api/FriendsIdsResponse.json
@@ -0,0 +1,32 @@
+{
+  "$schema": "http://json-schema.org/draft-03/schema",
+  "$license": [
+    "http://www.apache.org/licenses/LICENSE-2.0"
+  ],
+  "id": "#",
+  "javaType": "org.apache.streams.twitter.api.FriendsIdsResponse",
+  "javaInterfaces": [
+    "java.io.Serializable"
+  ],
+  "description": "https://dev.twitter.com/rest/reference/get/friends/ids",
+  "properties": {
+    "ids": {
+      "type": "array",
+      "items": {
+        "type": "integer"
+      }
+    },
+    "previous_cursor": {
+      "type": "integer"
+    },
+    "previous_cursor_str": {
+      "type": "string"
+    },
+    "next_cursor": {
+      "type": "integer"
+    },
+    "next_cursor_str": {
+      "type": "string"
+    }
+  }
+}
\ No newline at end of file