You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/22 19:44:46 UTC

[1/3] incubator-streams git commit: level up moreover provider

Repository: incubator-streams
Updated Branches:
  refs/heads/master 9861124ae -> 67d5cca51


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf b/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
deleted file mode 100644
index 3b683fe..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-moreover {
-  apiKeys {
-    key {
-      key = ""
-      startingSequence = ""
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
deleted file mode 100644
index ea83777..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.data.util.JsonUtil;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URL;
-import java.nio.charset.Charset;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-/**
- * Tests ability to serialize moreover json Strings
- */
-public class MoreoverJsonActivitySerializerIT {
-    JsonNode json;
-    ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
-    ObjectMapper mapper;
-
-    @Before
-    public void setup() throws Exception {
-
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-
-        mapper = new ObjectMapper();
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
-        json = mapper.readValue(writer.toString(), JsonNode.class);
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        for (JsonNode item : json) {
-            test(serializer.deserialize(getString(item)));
-        }
-    }
-
-
-    private String getString(JsonNode jsonNode)  {
-        try {
-            return new ObjectMapper().writeValueAsString(jsonNode);
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
deleted file mode 100644
index da660cd..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-/**
- * Tests ability to serialize moreover xml Strings
- */
-public class MoreoverXmlActivitySerializerIT {
-    ActivitySerializer serializer;
-    private String xml;
-
-    @Before
-    public void setup() throws IOException {
-        serializer = new MoreoverXmlActivitySerializer();
-        xml = loadXml();
-    }
-
-    @Test
-    public void loadData() throws Exception {
-        List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
-        for (Activity activity : activities) {
-            test(activity);
-        }
-    }
-
-    private String loadXml() throws IOException {
-        StringWriter writer = new StringWriter();
-        InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml");
-        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-        return writer.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
deleted file mode 100644
index 14b7652..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.regex.Pattern.matches;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-public class MoreoverTestUtil {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class);
-
-    public static void test(Activity activity) {
-        assertThat(activity, is(not(nullValue())));
-        assertThat(activity.getActor(), is(not(nullValue())));
-        assertThat(activity.getObject(), is(not(nullValue())));
-        if(activity.getObject().getId() != null) {
-            assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true));
-        }
-        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
-        LOGGER.debug(activity.getPublished().toString());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
new file mode 100644
index 0000000..cdd5822
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.regex.Pattern.matches;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+public class MoreoverTestUtil {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class);
+
+    public static void test(Activity activity) {
+        assertThat(activity, is(not(nullValue())));
+        assertThat(activity.getActor(), is(not(nullValue())));
+        assertThat(activity.getObject(), is(not(nullValue())));
+        if(activity.getObject().getId() != null) {
+            assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true));
+        }
+        assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+        LOGGER.debug(activity.getPublished().toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
new file mode 100644
index 0000000..94ef097
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverJsonActivitySerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+
+import static org.apache.streams.moreover.MoreoverTestUtil.test;
+
+/**
+ * Tests ability to serialize moreover json Strings
+ */
+public class MoreoverJsonActivitySerializerIT {
+    JsonNode json;
+    ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
+    ObjectMapper mapper;
+
+    @Before
+    public void setup() throws Exception {
+
+        StringWriter writer = new StringWriter();
+        InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json");
+        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+
+        mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+        json = mapper.readValue(writer.toString(), JsonNode.class);
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        for (JsonNode item : json) {
+            test(serializer.deserialize(getString(item)));
+        }
+    }
+
+
+    private String getString(JsonNode jsonNode)  {
+        try {
+            return new ObjectMapper().writeValueAsString(jsonNode);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
new file mode 100644
index 0000000..ad0b384
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.streams.moreover.MoreoverXmlActivitySerializer;
+
+import static org.apache.streams.moreover.MoreoverTestUtil.test;
+
+/**
+ * Tests ability to serialize moreover xml Strings
+ */
+public class MoreoverXmlActivitySerializerIT {
+    ActivitySerializer serializer;
+    private String xml;
+
+    @Before
+    public void setup() throws IOException {
+        serializer = new MoreoverXmlActivitySerializer();
+        xml = loadXml();
+    }
+
+    @Test
+    public void loadData() throws Exception {
+        List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
+        for (Activity activity : activities) {
+            test(activity);
+        }
+    }
+
+    private String loadXml() throws IOException {
+        StringWriter writer = new StringWriter();
+        InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml");
+        IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+        return writer.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
new file mode 100644
index 0000000..2bc672d
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.moreover.MoreoverProvider;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Integration test for MoreoverProviderIT
+ *
+ * Created by sblackmon on 10/21/16.
+ */
+@Ignore("this is ignored because the project doesn't have credentials to test it with during CI")
+public class MoreoverProviderIT {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProviderIT.class);
+
+    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+    @Test
+    public void testRssStreamProvider() throws Exception {
+
+        String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+        String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
+
+        MoreoverProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+        File out = new File(outfile);
+        assert (out.exists());
+        assert (out.canRead());
+        assert (out.isFile());
+
+        FileReader outReader = new FileReader(out);
+        LineNumberReader outCounter = new LineNumberReader(outReader);
+
+        while(outCounter.readLine() != null) {}
+
+        assert (outCounter.getLineNumber() >= 1);
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf b/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
new file mode 100644
index 0000000..3b683fe
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+moreover {
+  apiKeys {
+    key {
+      key = ""
+      startingSequence = ""
+    }
+  }
+}


[2/3] incubator-streams git commit: level up moreover provider

Posted by sb...@apache.org.
level up moreover provider

add main methods to each Provider (STREAMS-412)
add real integration tests (STREAMS-415)
reorganize packages


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/015f0ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/015f0ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/015f0ae1

Branch: refs/heads/master
Commit: 015f0ae1eb8b662889e0fa544607c28e25b0dc38
Parents: 11e3a0f
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 21 11:04:47 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 21 11:04:47 2016 -0500

----------------------------------------------------------------------
 .../data/MoreoverJsonActivitySerializer.java    |  97 ---------
 .../data/MoreoverXmlActivitySerializer.java     | 105 ----------
 .../streams/data/moreover/MoreoverClient.java   | 110 ----------
 .../data/moreover/MoreoverConfigurator.java     |  58 ------
 .../streams/data/moreover/MoreoverProvider.java | 114 -----------
 .../data/moreover/MoreoverProviderTask.java     |  94 ---------
 .../streams/data/moreover/MoreoverResult.java   | 204 -------------------
 .../data/moreover/MoreoverResultSetWrapper.java |  34 ----
 .../apache/streams/data/util/MoreoverUtils.java | 155 --------------
 .../apache/streams/moreover/MoreoverClient.java | 110 ++++++++++
 .../streams/moreover/MoreoverConfigurator.java  |  56 +++++
 .../MoreoverJsonActivitySerializer.java         |  98 +++++++++
 .../streams/moreover/MoreoverProvider.java      | 177 ++++++++++++++++
 .../streams/moreover/MoreoverProviderTask.java  |  91 +++++++++
 .../apache/streams/moreover/MoreoverResult.java | 200 ++++++++++++++++++
 .../moreover/MoreoverResultSetWrapper.java      |  32 +++
 .../apache/streams/moreover/MoreoverUtils.java  | 155 ++++++++++++++
 .../moreover/MoreoverXmlActivitySerializer.java | 106 ++++++++++
 .../src/main/resources/moreover.conf            |  25 ---
 .../data/MoreoverJsonActivitySerializerIT.java  |  79 -------
 .../data/MoreoverXmlActivitySerializerIT.java   |  65 ------
 .../streams/data/util/MoreoverTestUtil.java     |  43 ----
 .../streams/moreover/MoreoverTestUtil.java      |  43 ++++
 .../test/MoreoverJsonActivitySerializerIT.java  |  76 +++++++
 .../test/MoreoverXmlActivitySerializerIT.java   |  66 ++++++
 .../test/provider/MoreoverProviderIT.java       |  67 ++++++
 .../src/test/resources/moreover.conf            |  25 +++
 27 files changed, 1302 insertions(+), 1183 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
deleted file mode 100644
index ae48b41..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.moreover.api.Article;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Deserializes Moreover JSON format into Activities
- */
-public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
-
-    public MoreoverJsonActivitySerializer() {
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/json+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) {
-        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
-        LOGGER.debug(serialized);
-
-        ObjectMapper mapper = new ObjectMapper();
-        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
-        mapper.setAnnotationIntrospector(introspector);
-        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
-        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
-        Article article;
-        try {
-            ObjectNode node = (ObjectNode)mapper.readTree(serialized);
-            node.remove("tags");
-            node.remove("locations");
-            node.remove("companies");
-            node.remove("topics");
-            node.remove("media");
-            node.remove("outboundUrls");
-            ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
-            jsonNodes.remove("editorialTopics");
-            jsonNodes.remove("tags");
-            jsonNodes.remove("autoTopics");
-            article = mapper.convertValue(node, Article.class);
-        } catch (IOException e) {
-            throw new IllegalArgumentException("Unable to deserialize", e);
-        }
-        return MoreoverUtils.convert(article);
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        throw new NotImplementedException("Not currently implemented");
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
deleted file mode 100644
index d60bcb8..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import com.moreover.api.ObjectFactory;
-import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
- */
-public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
-
-    //JAXBContext is threadsafe (supposedly)
-    private final JAXBContext articleContext;
-    private final JAXBContext articlesContext;
-
-    public MoreoverXmlActivitySerializer() {
-        articleContext = createContext(Article.class);
-        articlesContext = createContext(ArticlesResponse.class);
-    }
-
-    @Override
-    public String serializationFormat() {
-        return "application/xml+vnd.moreover.com.v1";
-    }
-
-    @Override
-    public String serialize(Activity deserialized) {
-        throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
-    }
-
-    @Override
-    public Activity deserialize(String serialized) {
-        Article article = deserializeMoreover(serialized);
-        return MoreoverUtils.convert(article);
-    }
-
-    @Override
-    public List<Activity> deserializeAll(List<String> serializedList) {
-        List<Activity> activities = new LinkedList<Activity>();
-        for(String item : serializedList) {
-            ArticlesResponse response = deserializeMoreoverResponse(item);
-            for(Article article : response.getArticles().getArticle()) {
-                activities.add(MoreoverUtils.convert(article));
-            }
-        }
-        return activities;
-    }
-
-    private Article deserializeMoreover(String serialized){
-        try {
-            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
-            return (Article) unmarshaller.unmarshal(new StringReader(serialized));
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover data", e);
-        }
-    }
-
-    private ArticlesResponse deserializeMoreoverResponse(String serialized){
-        try {
-            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
-            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
-        } catch (JAXBException e) {
-            throw new SerializationException("Unable to deserialize Moreover data", e);
-        }
-    }
-
-    private JAXBContext createContext(Class articleClass) {
-        JAXBContext context;
-        try {
-            context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
-        } catch (JAXBException e) {
-            throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
-        }
-        return context;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
deleted file mode 100644
index 9ab60c5..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.util.Date;
-
-/**
- *
- */
-public class MoreoverClient {
-    private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
-
-    private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
-    private final String id;
-    private String apiKey;
-    private BigInteger lastSequenceId = BigInteger.ZERO;
-    //testing purpose only
-    public long pullTime;
-    private boolean debug;
-
-    public MoreoverClient(String id, String apiKey, String sequence) {
-        logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
-        this.id = id;
-        this.apiKey = apiKey;
-        this.lastSequenceId = new BigInteger(sequence);
-    }
-
-    public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
-        String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
-        logger.debug("Making call to {}", urlString);
-        long start = System.nanoTime();
-        MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
-        if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
-        {
-            this.lastSequenceId = result.getMaxSequencedId();
-            logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
-        }
-        else
-            logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
-        return result;
-    }
-
-    public MoreoverResult getNextBatch() throws IOException{
-        logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
-        return getArticlesAfter(this.lastSequenceId.toString(), 500);
-    }
-
-    private String getArticles2(URL url) throws IOException {
-        HttpURLConnection cn = (HttpURLConnection) url.openConnection();
-        cn.setRequestMethod("GET");
-        cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
-        cn.setDoInput(true);
-        cn.setDoOutput(false);
-        BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")));
-        String line = null;
-        StringBuilder builder = new StringBuilder();
-        String s = "";
-        String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8"));
-        while((line = reader.readLine()) != null) {
-            result+=line;
-        }
-        pullTime = new Date().getTime();
-        return result;
-    }
-
-    private String getArticles(URL url) throws IOException{
-        HttpURLConnection cn = (HttpURLConnection) url.openConnection();
-        cn.setRequestMethod("GET");
-        cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
-        cn.setDoInput(true);
-        cn.setDoOutput(false);
-        StringWriter writer = new StringWriter();
-        IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
-        writer.flush();
-        pullTime = new Date().getTime();
-
-        // added after seeing java.net.SocketException: Too many open files
-        cn.disconnect();
-
-        return writer.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
deleted file mode 100644
index 4c3ba06..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.streams.moreover.MoreoverConfiguration;
-import org.apache.streams.moreover.MoreoverKeyData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class MoreoverConfigurator {
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class);
-
-    public static MoreoverConfiguration detectConfiguration(Config moreover) {
-
-        MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration();
-
-        List<MoreoverKeyData> apiKeys = Lists.newArrayList();
-
-        Config apiKeysConfig = moreover.getConfig("apiKeys");
-
-        if( !apiKeysConfig.isEmpty())
-            for( String apiKeyId : apiKeysConfig.root().keySet() ) {
-                Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
-                apiKeys.add(new MoreoverKeyData()
-                        .withId(apiKeyConfig.getString("key"))
-                        .withKey(apiKeyConfig.getString("key"))
-                        .withStartingSequence(apiKeyConfig.getString("startingSequence")));
-            }
-        moreoverConfiguration.setApiKeys(apiKeys);
-
-        return moreoverConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
deleted file mode 100644
index 1add4d2..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.*;
-import net.jcip.annotations.Immutable;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.moreover.MoreoverConfiguration;
-import org.apache.streams.moreover.MoreoverKeyData;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
-
-public class MoreoverProvider implements StreamsProvider {
-
-    public final static String STREAMS_ID = "MoreoverProvider";
-
-    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
-
-    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
-    private List<MoreoverKeyData> keys;
-
-    private MoreoverConfiguration config;
-
-    private ExecutorService executor;
-
-    public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
-        this.config = moreoverConfiguration;
-        this.keys = Lists.newArrayList();
-        for( MoreoverKeyData apiKey : config.getApiKeys()) {
-            this.keys.add(apiKey);
-        }
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    public void startStream() {
-
-        for(MoreoverKeyData key : keys) {
-            MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
-            executor.submit(new Thread(task));
-            LOGGER.info("Started producer for {}", key.getKey());
-        }
-
-    }
-
-    @Override
-    public synchronized StreamsResultSet readCurrent() {
-
-        LOGGER.debug("readCurrent: {}", providerQueue.size());
-
-        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
-        Iterators.addAll(currentIterator, providerQueue.iterator());
-
-        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
-
-        providerQueue.clear();
-
-        return current;
-    }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
-    }
-
-    @Override
-    public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        LOGGER.debug("Prepare");
-        executor = Executors.newSingleThreadExecutor();
-    }
-
-    @Override
-    public void cleanUp() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
deleted file mode 100644
index 2640bfd..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Queue;
-
-/**
- * Task to pull from the Morever API
- */
-public class MoreoverProviderTask implements Runnable {
-
-    public static final int LATENCY = 10;
-    public static final int REQUIRED_LATENCY = LATENCY * 1000;
-    private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
-
-    private String lastSequence;
-    private final String apiKey;
-    private final String apiId;
-    private final Queue<StreamsDatum> results;
-    private final MoreoverClient moClient;
-    private boolean started = false;
-
-    public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) {
-        //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
-        this.apiId = apiId;
-        this.apiKey = apiKey;
-        this.results = results;
-        this.lastSequence = lastSequence;
-        this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
-        initializeClient(moClient);
-    }
-
-    @Override
-    public void run() {
-        while(true) {
-            try {
-                ensureTime(moClient);
-                MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
-                started = true;
-                lastSequence = result.process().toString();
-                for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
-                    results.offer(entry);
-                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
-
-            } catch (Exception e) {
-                logger.error("Exception while polling moreover", e);
-            }
-        }
-    }
-
-    private void ensureTime(MoreoverClient moClient) {
-        try {
-            long gap = System.currentTimeMillis() - moClient.pullTime;
-            if (gap < REQUIRED_LATENCY)
-                Thread.sleep(REQUIRED_LATENCY - gap);
-        } catch (Exception e) {
-            logger.warn("Error sleeping for latency");
-        }
-    }
-
-    private void initializeClient(MoreoverClient moClient) {
-        try {
-            moClient.getArticlesAfter(this.lastSequence, 2);
-        } catch (Exception e) {
-            logger.error("Failed to start stream, {}", this.apiKey);
-            logger.error("Exception : ", e);
-            throw new IllegalStateException("Unable to initialize stream", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
deleted file mode 100644
index 050026b..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.fasterxml.aalto.stax.InputFactoryImpl;
-import com.fasterxml.aalto.stax.OutputFactoryImpl;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
-import com.fasterxml.jackson.dataformat.xml.XmlFactory;
-import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-
-
-public class MoreoverResult implements Iterable<StreamsDatum> {
-
-    private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
-
-    private ObjectMapper mapper;
-    private XmlMapper xmlMapper;
-
-    private String xmlString;
-    private String jsonString;
-    private ArticlesResponse resultObject;
-    private ArticlesResponse.Articles articles;
-    private List<Article> articleArray;
-    private long start;
-    private long end;
-    private String clientId;
-    private BigInteger maxSequencedId = BigInteger.ZERO;
-
-    protected ArticlesResponse response;
-    protected List<StreamsDatum> list = Lists.newArrayList();
-
-    protected MoreoverResult(String clientId, String xmlString, long start, long end) {
-        this.xmlString = xmlString;
-        this.clientId = clientId;
-        this.start = start;
-        this.end = end;
-        XmlFactory f = new XmlFactory(new InputFactoryImpl(),
-                new OutputFactoryImpl());
-
-        JacksonXmlModule module = new JacksonXmlModule();
-
-        module.setDefaultUseWrapper(false);
-
-        xmlMapper = new XmlMapper(f, module);
-
-        xmlMapper
-                .configure(
-                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
-                        Boolean.TRUE);
-        xmlMapper
-                .configure(
-                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
-                        Boolean.TRUE);
-        xmlMapper.configure(
-                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
-                Boolean.TRUE);
-        xmlMapper.configure(
-                DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
-                Boolean.FALSE);
-
-        mapper = new ObjectMapper();
-
-        mapper
-                .configure(
-                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
-                        Boolean.TRUE);
-        mapper.configure(
-                DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
-                Boolean.TRUE);
-        mapper
-                .configure(
-                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
-                        Boolean.TRUE);
-        mapper.configure(
-                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
-                Boolean.TRUE);
-
-    }
-
-    public String getClientId() {
-        return clientId;
-    }
-
-    public long getStart() {
-        return start;
-    }
-
-    public long getEnd() {
-        return end;
-    }
-
-    public BigInteger process() {
-
-        try {
-            this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
-        } catch (JsonMappingException e) {
-            // theory is this may not be fatal
-            this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
-        } catch (Exception e) {
-            e.printStackTrace();
-            logger.warn("Unable to process document:");
-            logger.warn(xmlString);
-        }
-
-        if( this.resultObject.getStatus().equals("FAILURE"))
-        {
-            logger.warn(this.resultObject.getStatus());
-            logger.warn(this.resultObject.getMessageCode());
-            logger.warn(this.resultObject.getUserMessage());
-            logger.warn(this.resultObject.getDeveloperMessage());
-        }
-        else
-        {
-            this.articles = resultObject.getArticles();
-            this.articleArray = articles.getArticle();
-
-            for (Article article : articleArray) {
-                BigInteger sequenceid = new BigInteger(article.getSequenceId());
-                list.add(new StreamsDatum(article, sequenceid));
-                logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
-                if (sequenceid.compareTo(this.maxSequencedId) > 0) {
-                    this.maxSequencedId = sequenceid;
-                }
-            }
-        }
-
-        return this.maxSequencedId;
-    }
-
-    public String getXmlString() {
-        return this.xmlString;
-    }
-
-    public BigInteger getMaxSequencedId() {
-        return this.maxSequencedId;
-    }
-
-    @Override
-    public Iterator<StreamsDatum> iterator() {
-        return list.iterator();
-    }
-
-    protected static class JsonStringIterator implements Iterator<Serializable> {
-
-        private Iterator<Serializable> underlying;
-
-        protected JsonStringIterator(Iterator<Serializable> underlying) {
-            this.underlying = underlying;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return underlying.hasNext();
-        }
-
-        @Override
-        public String next() {
-            return underlying.next().toString();
-        }
-
-        @Override
-        public void remove() {
-            underlying.remove();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
deleted file mode 100644
index b0f95a4..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.Queue;
-
-public class MoreoverResultSetWrapper extends StreamsResultSet {
-
-    public MoreoverResultSetWrapper(MoreoverResult underlying) {
-        super((Queue<StreamsDatum>)underlying);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
deleted file mode 100644
index 0424875..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import com.moreover.api.*;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Provider;
-import org.joda.time.DateTime;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import static org.apache.streams.data.util.ActivityUtil.*;
-
-/**
- * Provides utilities for Moroever data
- */
-public class MoreoverUtils {
-    private MoreoverUtils() {
-    }
-
-    public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
-
-    public static Activity convert(Article article) {
-        Activity activity = new Activity();
-        Source source = article.getSource();
-        activity.setActor(convert(article.getAuthor(), source.getName()));
-        activity.setProvider(convert(source));
-        activity.setTarget(convertTarget(source));
-        activity.setObject(convertObject(article));
-        activity.setPublished(DateTime.parse(article.getPublishedDate()));
-        activity.setContent(article.getContent());
-        activity.setTitle(article.getTitle());
-        activity.setVerb("posted");
-        fixActivityId(activity);
-        addLocationExtension(activity, source);
-        addLanguageExtension(activity, article);
-        activity.setLinks(convertLinks(article));
-        return activity;
-    }
-
-    private static void fixActivityId(Activity activity) {
-        if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) {
-            activity.setId(null);
-        }
-    }
-
-    private static List convertLinks(Article article) {
-        List list = new LinkedList();
-        Article.OutboundUrls outboundUrls = article.getOutboundUrls();
-        if (outboundUrls != null) {
-            for (String url : outboundUrls.getOutboundUrl()) {
-                list.add(url);
-            }
-        }
-        return list;
-    }
-
-    public static ActivityObject convertTarget(Source source) {
-        ActivityObject object = new ActivityObject();
-        object.setUrl(source.getHomeUrl());
-        object.setDisplayName(source.getName());
-        return object;
-    }
-
-    public static ActivityObject convertObject(Article article) {
-        ActivityObject object = new ActivityObject();
-        object.setContent(article.getContent());
-        object.setSummary(article.getTitle());
-        object.setUrl(article.getOriginalUrl());
-        object.setObjectType(article.getDataFormat());
-        String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat();
-        object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId()));
-        object.setPublished(DateTime.parse(article.getPublishedDate()));
-        return object;
-    }
-
-    public static Provider convert(Source source) {
-        Provider provider = new Provider();
-        Feed feed = source.getFeed();
-        String display = getProviderID(feed);
-        provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_")));
-        provider.setDisplayName(display);
-        provider.setUrl(feed.getUrl());
-        return provider;
-    }
-
-    public static Actor convert(Author author, String platformName) {
-        Actor actor = new Actor();
-        AuthorPublishingPlatform platform = author.getPublishingPlatform();
-        String userId = platform.getUserId();
-        if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId));
-        actor.setDisplayName(author.getName());
-        actor.setUrl(author.getHomeUrl());
-        actor.setSummary(author.getDescription());
-        actor.setAdditionalProperty("email", author.getEmail());
-        return actor;
-    }
-
-    public static void addLocationExtension(Activity activity, Source value) {
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-        String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode();
-        if (country != null) {
-            Map<String, Object> location = new HashMap<String, Object>();
-            location.put(LOCATION_EXTENSION_COUNTRY, country);
-            extensions.put(LOCATION_EXTENSION, location);
-        }
-    }
-
-    public static void addLanguageExtension(Activity activity, Article value) {
-        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
-        String language = value.getLanguage();
-        if (language != null) {
-            extensions.put(LANGUAGE_EXTENSION, language);
-        }
-    }
-
-    public static Date parse(String str) {
-        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
-        try {
-            return fmt.parse(str);
-        } catch (ParseException e) {
-            throw new IllegalArgumentException("Invalid date format", e);
-        }
-    }
-
-    private static String getProviderID(Feed feed) {
-        return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform());
-    }
-
-    private static String getProviderID(String feed) {
-        return feed.toLowerCase().replace(" ", "_").trim();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
new file mode 100644
index 0000000..05e6120
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Date;
+
+/**
+ *
+ */
+public class MoreoverClient {
+    private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
+
+    private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
+    private final String id;
+    private String apiKey;
+    private BigInteger lastSequenceId = BigInteger.ZERO;
+    //testing purpose only
+    public long pullTime;
+    private boolean debug;
+
+    public MoreoverClient(String id, String apiKey, String sequence) {
+        logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
+        this.id = id;
+        this.apiKey = apiKey;
+        this.lastSequenceId = new BigInteger(sequence);
+    }
+
+    public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
+        String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
+        logger.debug("Making call to {}", urlString);
+        long start = System.nanoTime();
+        MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
+        if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
+        {
+            this.lastSequenceId = result.getMaxSequencedId();
+            logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
+        }
+        else
+            logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
+        return result;
+    }
+
+    public MoreoverResult getNextBatch() throws IOException{
+        logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
+        return getArticlesAfter(this.lastSequenceId.toString(), 500);
+    }
+
+    private String getArticles2(URL url) throws IOException {
+        HttpURLConnection cn = (HttpURLConnection) url.openConnection();
+        cn.setRequestMethod("GET");
+        cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
+        cn.setDoInput(true);
+        cn.setDoOutput(false);
+        BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")));
+        String line = null;
+        StringBuilder builder = new StringBuilder();
+        String s = "";
+        String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8"));
+        while((line = reader.readLine()) != null) {
+            result+=line;
+        }
+        pullTime = new Date().getTime();
+        return result;
+    }
+
+    private String getArticles(URL url) throws IOException{
+        HttpURLConnection cn = (HttpURLConnection) url.openConnection();
+        cn.setRequestMethod("GET");
+        cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
+        cn.setDoInput(true);
+        cn.setDoOutput(false);
+        StringWriter writer = new StringWriter();
+        IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
+        writer.flush();
+        pullTime = new Date().getTime();
+
+        // added after seeing java.net.SocketException: Too many open files
+        cn.disconnect();
+
+        return writer.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
new file mode 100644
index 0000000..8d89582
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class MoreoverConfigurator {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class);
+
+    public static MoreoverConfiguration detectConfiguration(Config moreover) {
+
+        MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration();
+
+        List<MoreoverKeyData> apiKeys = Lists.newArrayList();
+
+        Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+        if( !apiKeysConfig.isEmpty())
+            for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+                Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+                apiKeys.add(new MoreoverKeyData()
+                        .withId(apiKeyConfig.getString("key"))
+                        .withKey(apiKeyConfig.getString("key"))
+                        .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+            }
+        moreoverConfiguration.setApiKeys(apiKeys);
+
+        return moreoverConfiguration;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
new file mode 100644
index 0000000..17fde37
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.moreover.api.Article;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Deserializes Moreover JSON format into Activities
+ */
+public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
+
+    public MoreoverJsonActivitySerializer() {
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/json+vnd.moreover.com.v1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+        serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
+
+        LOGGER.debug(serialized);
+
+        ObjectMapper mapper = new ObjectMapper();
+        AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+        mapper.setAnnotationIntrospector(introspector);
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
+        mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+        mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+        Article article;
+        try {
+            ObjectNode node = (ObjectNode)mapper.readTree(serialized);
+            node.remove("tags");
+            node.remove("locations");
+            node.remove("companies");
+            node.remove("topics");
+            node.remove("media");
+            node.remove("outboundUrls");
+            ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
+            jsonNodes.remove("editorialTopics");
+            jsonNodes.remove("tags");
+            jsonNodes.remove("autoTopics");
+            article = mapper.convertValue(node, Article.class);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Unable to deserialize", e);
+        }
+        return MoreoverUtils.convert(article);
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        throw new NotImplementedException("Not currently implemented");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
new file mode 100644
index 0000000..78d8e9d
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Streams Provider for the Moreover Metabase API
+ *
+ *  To use from command line:
+ *
+ *  Supply configuration similar to src/test/resources/rss.conf
+ *
+ *  Launch using:
+ *
+ *  mvn exec:java -Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider -Dexec.args="rss.conf articles.json"
+ */
+public class MoreoverProvider implements StreamsProvider {
+
+    public final static String STREAMS_ID = "MoreoverProvider";
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
+
+    protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+    private List<MoreoverKeyData> keys;
+
+    private MoreoverConfiguration config;
+
+    private ExecutorService executor;
+
+    public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
+        this.config = moreoverConfiguration;
+        this.keys = Lists.newArrayList();
+        for( MoreoverKeyData apiKey : config.getApiKeys()) {
+            this.keys.add(apiKey);
+        }
+    }
+
+    @Override
+    public String getId() {
+        return STREAMS_ID;
+    }
+
+    public void startStream() {
+
+        for(MoreoverKeyData key : keys) {
+            MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
+            executor.submit(new Thread(task));
+            LOGGER.info("Started producer for {}", key.getKey());
+        }
+
+    }
+
+    @Override
+    public synchronized StreamsResultSet readCurrent() {
+
+        LOGGER.debug("readCurrent: {}", providerQueue.size());
+
+        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+        Iterators.addAll(currentIterator, providerQueue.iterator());
+
+        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+
+        providerQueue.clear();
+
+        return current;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return !executor.isShutdown() && !executor.isTerminated();
+    }
+
+    @Override
+    public void prepare(Object configurationObject) {
+        LOGGER.debug("Prepare");
+        executor = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void cleanUp() {
+
+    }
+
+    public static void main(String[] args) throws Exception {
+
+        Preconditions.checkArgument(args.length >= 2);
+
+        String configfile = args[0];
+        String outfile = args[1];
+
+        Config reference = ConfigFactory.load();
+        File conf_file = new File(configfile);
+        assert(conf_file.exists());
+        Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+        Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+
+        StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+        MoreoverConfiguration config = new ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe, "rss");
+        MoreoverProvider provider = new MoreoverProvider(config);
+
+        ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+        PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+        provider.prepare(config);
+        provider.startStream();
+        do {
+            Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+            Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+            while(iterator.hasNext()) {
+                StreamsDatum datum = iterator.next();
+                String json;
+                try {
+                    json = mapper.writeValueAsString(datum.getDocument());
+                    outStream.println(json);
+                } catch (JsonProcessingException e) {
+                    System.err.println(e.getMessage());
+                }
+            }
+        } while( provider.isRunning());
+        provider.cleanUp();
+        outStream.flush();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
new file mode 100644
index 0000000..ad92d73
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+
+/**
+ * Task to pull from the Morever API
+ */
+public class MoreoverProviderTask implements Runnable {
+
+    public static final int LATENCY = 10;
+    public static final int REQUIRED_LATENCY = LATENCY * 1000;
+    private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
+
+    private String lastSequence;
+    private final String apiKey;
+    private final String apiId;
+    private final Queue<StreamsDatum> results;
+    private final MoreoverClient moClient;
+    private boolean started = false;
+
+    public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) {
+        //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
+        this.apiId = apiId;
+        this.apiKey = apiKey;
+        this.results = results;
+        this.lastSequence = lastSequence;
+        this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
+        initializeClient(moClient);
+    }
+
+    @Override
+    public void run() {
+        while(true) {
+            try {
+                ensureTime(moClient);
+                MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
+                started = true;
+                lastSequence = result.process().toString();
+                for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
+                    results.offer(entry);
+                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
+            } catch (Exception e) {
+                logger.error("Exception while polling moreover", e);
+            }
+        }
+    }
+
+    private void ensureTime(MoreoverClient moClient) {
+        try {
+            long gap = System.currentTimeMillis() - moClient.pullTime;
+            if (gap < REQUIRED_LATENCY)
+                Thread.sleep(REQUIRED_LATENCY - gap);
+        } catch (Exception e) {
+            logger.warn("Error sleeping for latency");
+        }
+    }
+
+    private void initializeClient(MoreoverClient moClient) {
+        try {
+            moClient.getArticlesAfter(this.lastSequence, 2);
+        } catch (Exception e) {
+            logger.error("Failed to start stream, {}", this.apiKey);
+            logger.error("Exception : ", e);
+            throw new IllegalStateException("Unable to initialize stream", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
new file mode 100644
index 0000000..e07084f
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.aalto.stax.InputFactoryImpl;
+import com.fasterxml.aalto.stax.OutputFactoryImpl;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
+import com.fasterxml.jackson.dataformat.xml.XmlFactory;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import com.google.common.collect.Lists;
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class MoreoverResult implements Iterable<StreamsDatum> {
+
+    private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
+
+    private ObjectMapper mapper;
+    private XmlMapper xmlMapper;
+
+    private String xmlString;
+    private String jsonString;
+    private ArticlesResponse resultObject;
+    private ArticlesResponse.Articles articles;
+    private List<Article> articleArray;
+    private long start;
+    private long end;
+    private String clientId;
+    private BigInteger maxSequencedId = BigInteger.ZERO;
+
+    protected ArticlesResponse response;
+    protected List<StreamsDatum> list = Lists.newArrayList();
+
+    protected MoreoverResult(String clientId, String xmlString, long start, long end) {
+        this.xmlString = xmlString;
+        this.clientId = clientId;
+        this.start = start;
+        this.end = end;
+        XmlFactory f = new XmlFactory(new InputFactoryImpl(),
+                new OutputFactoryImpl());
+
+        JacksonXmlModule module = new JacksonXmlModule();
+
+        module.setDefaultUseWrapper(false);
+
+        xmlMapper = new XmlMapper(f, module);
+
+        xmlMapper
+                .configure(
+                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+                        Boolean.TRUE);
+        xmlMapper
+                .configure(
+                        DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+                        Boolean.TRUE);
+        xmlMapper
+                .configure(
+                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+                        Boolean.TRUE);
+        xmlMapper.configure(
+                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+                Boolean.TRUE);
+        xmlMapper.configure(
+                DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+                Boolean.FALSE);
+
+        mapper = new ObjectMapper();
+
+        mapper
+                .configure(
+                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+                        Boolean.TRUE);
+        mapper.configure(
+                DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+                Boolean.TRUE);
+        mapper
+                .configure(
+                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+                        Boolean.TRUE);
+        mapper.configure(
+                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+                Boolean.TRUE);
+
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public long getStart() {
+        return start;
+    }
+
+    public long getEnd() {
+        return end;
+    }
+
+    public BigInteger process() {
+
+        try {
+            this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
+        } catch (JsonMappingException e) {
+            // theory is this may not be fatal
+            this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
+        } catch (Exception e) {
+            e.printStackTrace();
+            logger.warn("Unable to process document:");
+            logger.warn(xmlString);
+        }
+
+        if( this.resultObject.getStatus().equals("FAILURE"))
+        {
+            logger.warn(this.resultObject.getStatus());
+            logger.warn(this.resultObject.getMessageCode());
+            logger.warn(this.resultObject.getUserMessage());
+            logger.warn(this.resultObject.getDeveloperMessage());
+        }
+        else
+        {
+            this.articles = resultObject.getArticles();
+            this.articleArray = articles.getArticle();
+
+            for (Article article : articleArray) {
+                BigInteger sequenceid = new BigInteger(article.getSequenceId());
+                list.add(new StreamsDatum(article, sequenceid));
+                logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
+                if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+                    this.maxSequencedId = sequenceid;
+                }
+            }
+        }
+
+        return this.maxSequencedId;
+    }
+
+    public String getXmlString() {
+        return this.xmlString;
+    }
+
+    public BigInteger getMaxSequencedId() {
+        return this.maxSequencedId;
+    }
+
+    @Override
+    public Iterator<StreamsDatum> iterator() {
+        return list.iterator();
+    }
+
+    protected static class JsonStringIterator implements Iterator<Serializable> {
+
+        private Iterator<Serializable> underlying;
+
+        protected JsonStringIterator(Iterator<Serializable> underlying) {
+            this.underlying = underlying;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return underlying.hasNext();
+        }
+
+        @Override
+        public String next() {
+            return underlying.next().toString();
+        }
+
+        @Override
+        public void remove() {
+            underlying.remove();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
new file mode 100644
index 0000000..0a47bd1
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+
+import java.util.Queue;
+
+public class MoreoverResultSetWrapper extends StreamsResultSet {
+
+    public MoreoverResultSetWrapper(MoreoverResult underlying) {
+        super((Queue<StreamsDatum>)underlying);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
new file mode 100644
index 0000000..45d26df
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.moreover.api.*;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Provides utilities for Moroever data
+ */
+public class MoreoverUtils {
+    private MoreoverUtils() {
+    }
+
+    public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
+
+    public static Activity convert(Article article) {
+        Activity activity = new Activity();
+        Source source = article.getSource();
+        activity.setActor(convert(article.getAuthor(), source.getName()));
+        activity.setProvider(convert(source));
+        activity.setTarget(convertTarget(source));
+        activity.setObject(convertObject(article));
+        activity.setPublished(DateTime.parse(article.getPublishedDate()));
+        activity.setContent(article.getContent());
+        activity.setTitle(article.getTitle());
+        activity.setVerb("posted");
+        fixActivityId(activity);
+        addLocationExtension(activity, source);
+        addLanguageExtension(activity, article);
+        activity.setLinks(convertLinks(article));
+        return activity;
+    }
+
+    private static void fixActivityId(Activity activity) {
+        if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) {
+            activity.setId(null);
+        }
+    }
+
+    private static List convertLinks(Article article) {
+        List list = new LinkedList();
+        Article.OutboundUrls outboundUrls = article.getOutboundUrls();
+        if (outboundUrls != null) {
+            for (String url : outboundUrls.getOutboundUrl()) {
+                list.add(url);
+            }
+        }
+        return list;
+    }
+
+    public static ActivityObject convertTarget(Source source) {
+        ActivityObject object = new ActivityObject();
+        object.setUrl(source.getHomeUrl());
+        object.setDisplayName(source.getName());
+        return object;
+    }
+
+    public static ActivityObject convertObject(Article article) {
+        ActivityObject object = new ActivityObject();
+        object.setContent(article.getContent());
+        object.setSummary(article.getTitle());
+        object.setUrl(article.getOriginalUrl());
+        object.setObjectType(article.getDataFormat());
+        String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat();
+        object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId()));
+        object.setPublished(DateTime.parse(article.getPublishedDate()));
+        return object;
+    }
+
+    public static Provider convert(Source source) {
+        Provider provider = new Provider();
+        Feed feed = source.getFeed();
+        String display = getProviderID(feed);
+        provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_")));
+        provider.setDisplayName(display);
+        provider.setUrl(feed.getUrl());
+        return provider;
+    }
+
+    public static Actor convert(Author author, String platformName) {
+        Actor actor = new Actor();
+        AuthorPublishingPlatform platform = author.getPublishingPlatform();
+        String userId = platform.getUserId();
+        if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId));
+        actor.setDisplayName(author.getName());
+        actor.setUrl(author.getHomeUrl());
+        actor.setSummary(author.getDescription());
+        actor.setAdditionalProperty("email", author.getEmail());
+        return actor;
+    }
+
+    public static void addLocationExtension(Activity activity, Source value) {
+        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+        String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode();
+        if (country != null) {
+            Map<String, Object> location = new HashMap<String, Object>();
+            location.put(LOCATION_EXTENSION_COUNTRY, country);
+            extensions.put(LOCATION_EXTENSION, location);
+        }
+    }
+
+    public static void addLanguageExtension(Activity activity, Article value) {
+        Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+        String language = value.getLanguage();
+        if (language != null) {
+            extensions.put(LANGUAGE_EXTENSION, language);
+        }
+    }
+
+    public static Date parse(String str) {
+        DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
+        try {
+            return fmt.parse(str);
+        } catch (ParseException e) {
+            throw new IllegalArgumentException("Invalid date format", e);
+        }
+    }
+
+    private static String getProviderID(Feed feed) {
+        return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform());
+    }
+
+    private static String getProviderID(String feed) {
+        return feed.toLowerCase().replace(" ", "_").trim();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
new file mode 100644
index 0000000..4b7b3b0
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import com.moreover.api.ObjectFactory;
+import org.apache.commons.lang.SerializationException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ */
+public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
+
+    //JAXBContext is threadsafe (supposedly)
+    private final JAXBContext articleContext;
+    private final JAXBContext articlesContext;
+
+    public MoreoverXmlActivitySerializer() {
+        articleContext = createContext(Article.class);
+        articlesContext = createContext(ArticlesResponse.class);
+    }
+
+    @Override
+    public String serializationFormat() {
+        return "application/xml+vnd.moreover.com.v1";
+    }
+
+    @Override
+    public String serialize(Activity deserialized) {
+        throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+    }
+
+    @Override
+    public Activity deserialize(String serialized) {
+        Article article = deserializeMoreover(serialized);
+        return MoreoverUtils.convert(article);
+    }
+
+    @Override
+    public List<Activity> deserializeAll(List<String> serializedList) {
+        List<Activity> activities = new LinkedList<Activity>();
+        for(String item : serializedList) {
+            ArticlesResponse response = deserializeMoreoverResponse(item);
+            for(Article article : response.getArticles().getArticle()) {
+                activities.add(MoreoverUtils.convert(article));
+            }
+        }
+        return activities;
+    }
+
+    private Article deserializeMoreover(String serialized){
+        try {
+            Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+            return (Article) unmarshaller.unmarshal(new StringReader(serialized));
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover data", e);
+        }
+    }
+
+    private ArticlesResponse deserializeMoreoverResponse(String serialized){
+        try {
+            Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+            return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
+        } catch (JAXBException e) {
+            throw new SerializationException("Unable to deserialize Moreover data", e);
+        }
+    }
+
+    private JAXBContext createContext(Class articleClass) {
+        JAXBContext context;
+        try {
+            context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
+        } catch (JAXBException e) {
+            throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
+        }
+        return context;
+    }
+}



[3/3] incubator-streams git commit: Merge branch '0.4-moreover'

Posted by sb...@apache.org.
Merge branch '0.4-moreover'

# Conflicts:
#	streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/67d5cca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/67d5cca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/67d5cca5

Branch: refs/heads/master
Commit: 67d5cca51ee250b5dea0012cd9576157b19e57ec
Parents: 9861124 015f0ae
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sat Oct 22 14:44:38 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sat Oct 22 14:44:38 2016 -0500

----------------------------------------------------------------------
 .../data/MoreoverJsonActivitySerializer.java    |  97 ---------
 .../data/MoreoverXmlActivitySerializer.java     | 105 ----------
 .../streams/data/moreover/MoreoverClient.java   | 110 ----------
 .../streams/data/moreover/MoreoverProvider.java | 114 -----------
 .../data/moreover/MoreoverProviderTask.java     |  94 ---------
 .../streams/data/moreover/MoreoverResult.java   | 204 -------------------
 .../data/moreover/MoreoverResultSetWrapper.java |  34 ----
 .../apache/streams/data/util/MoreoverUtils.java | 155 --------------
 .../apache/streams/moreover/MoreoverClient.java | 110 ++++++++++
 .../MoreoverJsonActivitySerializer.java         |  98 +++++++++
 .../streams/moreover/MoreoverProvider.java      | 177 ++++++++++++++++
 .../streams/moreover/MoreoverProviderTask.java  |  91 +++++++++
 .../apache/streams/moreover/MoreoverResult.java | 200 ++++++++++++++++++
 .../moreover/MoreoverResultSetWrapper.java      |  32 +++
 .../apache/streams/moreover/MoreoverUtils.java  | 155 ++++++++++++++
 .../moreover/MoreoverXmlActivitySerializer.java | 106 ++++++++++
 .../src/main/resources/moreover.conf            |  25 ---
 .../data/MoreoverJsonActivitySerializerIT.java  |  79 -------
 .../data/MoreoverXmlActivitySerializerIT.java   |  65 ------
 .../streams/data/util/MoreoverTestUtil.java     |  43 ----
 .../streams/moreover/MoreoverTestUtil.java      |  43 ++++
 .../test/MoreoverJsonActivitySerializerIT.java  |  76 +++++++
 .../test/MoreoverXmlActivitySerializerIT.java   |  66 ++++++
 .../test/provider/MoreoverProviderIT.java       |  67 ++++++
 .../src/test/resources/moreover.conf            |  25 +++
 25 files changed, 1246 insertions(+), 1125 deletions(-)
----------------------------------------------------------------------