You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2016/10/22 19:44:46 UTC
[1/3] incubator-streams git commit: level up moreover provider
Repository: incubator-streams
Updated Branches:
refs/heads/master 9861124ae -> 67d5cca51
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf b/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
deleted file mode 100644
index 3b683fe..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/resources/moreover.conf
+++ /dev/null
@@ -1,25 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-moreover {
- apiKeys {
- key {
- key = ""
- startingSequence = ""
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
deleted file mode 100644
index ea83777..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverJsonActivitySerializerIT.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.data.util.JsonUtil;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.net.URL;
-import java.nio.charset.Charset;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-/**
- * Tests ability to serialize moreover json Strings
- */
-public class MoreoverJsonActivitySerializerIT {
- JsonNode json;
- ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
- ObjectMapper mapper;
-
- @Before
- public void setup() throws Exception {
-
- StringWriter writer = new StringWriter();
- InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json");
- IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
-
- mapper = new ObjectMapper();
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
-
- json = mapper.readValue(writer.toString(), JsonNode.class);
- }
-
- @Test
- public void loadData() throws Exception {
- for (JsonNode item : json) {
- test(serializer.deserialize(getString(item)));
- }
- }
-
-
- private String getString(JsonNode jsonNode) {
- try {
- return new ObjectMapper().writeValueAsString(jsonNode);
- } catch (JsonProcessingException e) {
- throw new RuntimeException(e);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
deleted file mode 100644
index da660cd..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/MoreoverXmlActivitySerializerIT.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.List;
-
-import static org.apache.streams.data.util.MoreoverTestUtil.test;
-
-/**
- * Tests ability to serialize moreover xml Strings
- */
-public class MoreoverXmlActivitySerializerIT {
- ActivitySerializer serializer;
- private String xml;
-
- @Before
- public void setup() throws IOException {
- serializer = new MoreoverXmlActivitySerializer();
- xml = loadXml();
- }
-
- @Test
- public void loadData() throws Exception {
- List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
- for (Activity activity : activities) {
- test(activity);
- }
- }
-
- private String loadXml() throws IOException {
- StringWriter writer = new StringWriter();
- InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml");
- IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
- return writer.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
deleted file mode 100644
index 14b7652..0000000
--- a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/data/util/MoreoverTestUtil.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.util.regex.Pattern.matches;
-import static org.hamcrest.CoreMatchers.*;
-import static org.junit.Assert.assertThat;
-
-public class MoreoverTestUtil {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class);
-
- public static void test(Activity activity) {
- assertThat(activity, is(not(nullValue())));
- assertThat(activity.getActor(), is(not(nullValue())));
- assertThat(activity.getObject(), is(not(nullValue())));
- if(activity.getObject().getId() != null) {
- assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true));
- }
- assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
- LOGGER.debug(activity.getPublished().toString());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
new file mode 100644
index 0000000..cdd5822
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/MoreoverTestUtil.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.util.regex.Pattern.matches;
+import static org.hamcrest.CoreMatchers.*;
+import static org.junit.Assert.assertThat;
+
+public class MoreoverTestUtil {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverTestUtil.class);
+
+ public static void test(Activity activity) {
+ assertThat(activity, is(not(nullValue())));
+ assertThat(activity.getActor(), is(not(nullValue())));
+ assertThat(activity.getObject(), is(not(nullValue())));
+ if(activity.getObject().getId() != null) {
+ assertThat(matches("id:.*:[a-z]*s:[a-zA-Z0-9]*", activity.getObject().getId()), is(true));
+ }
+ assertThat(activity.getObject().getObjectType(), is(not(nullValue())));
+ LOGGER.debug(activity.getPublished().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
new file mode 100644
index 0000000..94ef097
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverJsonActivitySerializerIT.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverJsonActivitySerializer;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+
+import static org.apache.streams.moreover.MoreoverTestUtil.test;
+
+/**
+ * Tests ability to serialize moreover json Strings
+ */
+public class MoreoverJsonActivitySerializerIT {
+ JsonNode json;
+ ActivitySerializer serializer = new MoreoverJsonActivitySerializer();
+ ObjectMapper mapper;
+
+ @Before
+ public void setup() throws Exception {
+
+ StringWriter writer = new StringWriter();
+ InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.json");
+ IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+
+ mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+
+ json = mapper.readValue(writer.toString(), JsonNode.class);
+ }
+
+ @Test
+ public void loadData() throws Exception {
+ for (JsonNode item : json) {
+ test(serializer.deserialize(getString(item)));
+ }
+ }
+
+
+ private String getString(JsonNode jsonNode) {
+ try {
+ return new ObjectMapper().writeValueAsString(jsonNode);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
new file mode 100644
index 0000000..ad0b384
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/MoreoverXmlActivitySerializerIT.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.pojo.json.Activity;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.List;
+
+import org.apache.streams.moreover.MoreoverXmlActivitySerializer;
+
+import static org.apache.streams.moreover.MoreoverTestUtil.test;
+
+/**
+ * Tests ability to serialize moreover xml Strings
+ */
+public class MoreoverXmlActivitySerializerIT {
+ ActivitySerializer serializer;
+ private String xml;
+
+ @Before
+ public void setup() throws IOException {
+ serializer = new MoreoverXmlActivitySerializer();
+ xml = loadXml();
+ }
+
+ @Test
+ public void loadData() throws Exception {
+ List<Activity> activities = serializer.deserializeAll(Lists.newArrayList(xml));
+ for (Activity activity : activities) {
+ test(activity);
+ }
+ }
+
+ private String loadXml() throws IOException {
+ StringWriter writer = new StringWriter();
+ InputStream resourceAsStream = this.getClass().getResourceAsStream("/moreover.xml");
+ IOUtils.copy(resourceAsStream, writer, Charset.forName("UTF-8"));
+ return writer.toString();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
new file mode 100644
index 0000000..2bc672d
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/java/org/apache/streams/moreover/test/provider/MoreoverProviderIT.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover.test.provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+import org.apache.streams.moreover.MoreoverProvider;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.LineNumberReader;
+
+/**
+ * Integration test for MoreoverProviderIT
+ *
+ * Created by sblackmon on 10/21/16.
+ */
+@Ignore("this is ignored because the project doesn't have credentials to test it with during CI")
+public class MoreoverProviderIT {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProviderIT.class);
+
+ private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ @Test
+ public void testRssStreamProvider() throws Exception {
+
+ String configfile = "./target/test-classes/RssStreamProviderIT.conf";
+ String outfile = "./target/test-classes/RssStreamProviderIT.stdout.txt";
+
+ MoreoverProvider.main(Lists.newArrayList(configfile, outfile).toArray(new String[2]));
+
+ File out = new File(outfile);
+ assert (out.exists());
+ assert (out.canRead());
+ assert (out.isFile());
+
+ FileReader outReader = new FileReader(out);
+ LineNumberReader outCounter = new LineNumberReader(outReader);
+
+ while(outCounter.readLine() != null) {}
+
+ assert (outCounter.getLineNumber() >= 1);
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf b/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
new file mode 100644
index 0000000..3b683fe
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/test/resources/moreover.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+moreover {
+ apiKeys {
+ key {
+ key = ""
+ startingSequence = ""
+ }
+ }
+}
[2/3] incubator-streams git commit: level up moreover provider
Posted by sb...@apache.org.
level up moreover provider
add main methods to each Provider (STREAMS-412)
add real integration tests (STREAMS-415)
reorganize packages
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/015f0ae1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/015f0ae1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/015f0ae1
Branch: refs/heads/master
Commit: 015f0ae1eb8b662889e0fa544607c28e25b0dc38
Parents: 11e3a0f
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Fri Oct 21 11:04:47 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Fri Oct 21 11:04:47 2016 -0500
----------------------------------------------------------------------
.../data/MoreoverJsonActivitySerializer.java | 97 ---------
.../data/MoreoverXmlActivitySerializer.java | 105 ----------
.../streams/data/moreover/MoreoverClient.java | 110 ----------
.../data/moreover/MoreoverConfigurator.java | 58 ------
.../streams/data/moreover/MoreoverProvider.java | 114 -----------
.../data/moreover/MoreoverProviderTask.java | 94 ---------
.../streams/data/moreover/MoreoverResult.java | 204 -------------------
.../data/moreover/MoreoverResultSetWrapper.java | 34 ----
.../apache/streams/data/util/MoreoverUtils.java | 155 --------------
.../apache/streams/moreover/MoreoverClient.java | 110 ++++++++++
.../streams/moreover/MoreoverConfigurator.java | 56 +++++
.../MoreoverJsonActivitySerializer.java | 98 +++++++++
.../streams/moreover/MoreoverProvider.java | 177 ++++++++++++++++
.../streams/moreover/MoreoverProviderTask.java | 91 +++++++++
.../apache/streams/moreover/MoreoverResult.java | 200 ++++++++++++++++++
.../moreover/MoreoverResultSetWrapper.java | 32 +++
.../apache/streams/moreover/MoreoverUtils.java | 155 ++++++++++++++
.../moreover/MoreoverXmlActivitySerializer.java | 106 ++++++++++
.../src/main/resources/moreover.conf | 25 ---
.../data/MoreoverJsonActivitySerializerIT.java | 79 -------
.../data/MoreoverXmlActivitySerializerIT.java | 65 ------
.../streams/data/util/MoreoverTestUtil.java | 43 ----
.../streams/moreover/MoreoverTestUtil.java | 43 ++++
.../test/MoreoverJsonActivitySerializerIT.java | 76 +++++++
.../test/MoreoverXmlActivitySerializerIT.java | 66 ++++++
.../test/provider/MoreoverProviderIT.java | 67 ++++++
.../src/test/resources/moreover.conf | 25 +++
27 files changed, 1302 insertions(+), 1183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
deleted file mode 100644
index ae48b41..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverJsonActivitySerializer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.fasterxml.jackson.databind.AnnotationIntrospector;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
-import com.moreover.api.Article;
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Deserializes Moreover JSON format into Activities
- */
-public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
-
- public MoreoverJsonActivitySerializer() {
- }
-
- @Override
- public String serializationFormat() {
- return "application/json+vnd.moreover.com.v1";
- }
-
- @Override
- public String serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON");
- }
-
- @Override
- public Activity deserialize(String serialized) {
- serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
-
- LOGGER.debug(serialized);
-
- ObjectMapper mapper = new ObjectMapper();
- AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
- mapper.setAnnotationIntrospector(introspector);
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
- mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
- mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
- mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
- mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
-
- Article article;
- try {
- ObjectNode node = (ObjectNode)mapper.readTree(serialized);
- node.remove("tags");
- node.remove("locations");
- node.remove("companies");
- node.remove("topics");
- node.remove("media");
- node.remove("outboundUrls");
- ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
- jsonNodes.remove("editorialTopics");
- jsonNodes.remove("tags");
- jsonNodes.remove("autoTopics");
- article = mapper.convertValue(node, Article.class);
- } catch (IOException e) {
- throw new IllegalArgumentException("Unable to deserialize", e);
- }
- return MoreoverUtils.convert(article);
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- throw new NotImplementedException("Not currently implemented");
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
deleted file mode 100644
index d60bcb8..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/MoreoverXmlActivitySerializer.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data;
-
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import com.moreover.api.ObjectFactory;
-import org.apache.commons.lang.SerializationException;
-import org.apache.streams.data.util.MoreoverUtils;
-import org.apache.streams.pojo.json.Activity;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.JAXBElement;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Unmarshaller;
-import java.io.StringReader;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
- */
-public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
-
- //JAXBContext is threadsafe (supposedly)
- private final JAXBContext articleContext;
- private final JAXBContext articlesContext;
-
- public MoreoverXmlActivitySerializer() {
- articleContext = createContext(Article.class);
- articlesContext = createContext(ArticlesResponse.class);
- }
-
- @Override
- public String serializationFormat() {
- return "application/xml+vnd.moreover.com.v1";
- }
-
- @Override
- public String serialize(Activity deserialized) {
- throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
- }
-
- @Override
- public Activity deserialize(String serialized) {
- Article article = deserializeMoreover(serialized);
- return MoreoverUtils.convert(article);
- }
-
- @Override
- public List<Activity> deserializeAll(List<String> serializedList) {
- List<Activity> activities = new LinkedList<Activity>();
- for(String item : serializedList) {
- ArticlesResponse response = deserializeMoreoverResponse(item);
- for(Article article : response.getArticles().getArticle()) {
- activities.add(MoreoverUtils.convert(article));
- }
- }
- return activities;
- }
-
- private Article deserializeMoreover(String serialized){
- try {
- Unmarshaller unmarshaller = articleContext.createUnmarshaller();
- return (Article) unmarshaller.unmarshal(new StringReader(serialized));
- } catch (JAXBException e) {
- throw new SerializationException("Unable to deserialize Moreover data", e);
- }
- }
-
- private ArticlesResponse deserializeMoreoverResponse(String serialized){
- try {
- Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
- return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
- } catch (JAXBException e) {
- throw new SerializationException("Unable to deserialize Moreover data", e);
- }
- }
-
- private JAXBContext createContext(Class articleClass) {
- JAXBContext context;
- try {
- context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
- } catch (JAXBException e) {
- throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
- }
- return context;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
deleted file mode 100644
index 9ab60c5..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import org.apache.commons.io.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.StringWriter;
-import java.math.BigInteger;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.nio.charset.Charset;
-import java.util.Date;
-
-/**
- *
- */
-public class MoreoverClient {
- private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
-
- private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
- private final String id;
- private String apiKey;
- private BigInteger lastSequenceId = BigInteger.ZERO;
- //testing purpose only
- public long pullTime;
- private boolean debug;
-
- public MoreoverClient(String id, String apiKey, String sequence) {
- logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
- this.id = id;
- this.apiKey = apiKey;
- this.lastSequenceId = new BigInteger(sequence);
- }
-
- public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
- String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
- logger.debug("Making call to {}", urlString);
- long start = System.nanoTime();
- MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
- if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
- {
- this.lastSequenceId = result.getMaxSequencedId();
- logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
- }
- else
- logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
- return result;
- }
-
- public MoreoverResult getNextBatch() throws IOException{
- logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
- return getArticlesAfter(this.lastSequenceId.toString(), 500);
- }
-
- private String getArticles2(URL url) throws IOException {
- HttpURLConnection cn = (HttpURLConnection) url.openConnection();
- cn.setRequestMethod("GET");
- cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
- cn.setDoInput(true);
- cn.setDoOutput(false);
- BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")));
- String line = null;
- StringBuilder builder = new StringBuilder();
- String s = "";
- String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8"));
- while((line = reader.readLine()) != null) {
- result+=line;
- }
- pullTime = new Date().getTime();
- return result;
- }
-
- private String getArticles(URL url) throws IOException{
- HttpURLConnection cn = (HttpURLConnection) url.openConnection();
- cn.setRequestMethod("GET");
- cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
- cn.setDoInput(true);
- cn.setDoOutput(false);
- StringWriter writer = new StringWriter();
- IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
- writer.flush();
- pullTime = new Date().getTime();
-
- // added after seeing java.net.SocketException: Too many open files
- cn.disconnect();
-
- return writer.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
deleted file mode 100644
index 4c3ba06..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverConfigurator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.collect.Lists;
-import com.typesafe.config.Config;
-import org.apache.streams.moreover.MoreoverConfiguration;
-import org.apache.streams.moreover.MoreoverKeyData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class MoreoverConfigurator {
-
- private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class);
-
- public static MoreoverConfiguration detectConfiguration(Config moreover) {
-
- MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration();
-
- List<MoreoverKeyData> apiKeys = Lists.newArrayList();
-
- Config apiKeysConfig = moreover.getConfig("apiKeys");
-
- if( !apiKeysConfig.isEmpty())
- for( String apiKeyId : apiKeysConfig.root().keySet() ) {
- Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
- apiKeys.add(new MoreoverKeyData()
- .withId(apiKeyConfig.getString("key"))
- .withKey(apiKeyConfig.getString("key"))
- .withStartingSequence(apiKeyConfig.getString("startingSequence")));
- }
- moreoverConfiguration.setApiKeys(apiKeys);
-
- return moreoverConfiguration;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
deleted file mode 100644
index 1add4d2..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.base.Predicates;
-import com.google.common.collect.*;
-import net.jcip.annotations.Immutable;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.moreover.MoreoverConfiguration;
-import org.apache.streams.moreover.MoreoverKeyData;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.*;
-import java.util.concurrent.*;
-
-public class MoreoverProvider implements StreamsProvider {
-
- public final static String STREAMS_ID = "MoreoverProvider";
-
- private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
-
- protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
-
- private List<MoreoverKeyData> keys;
-
- private MoreoverConfiguration config;
-
- private ExecutorService executor;
-
- public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
- this.config = moreoverConfiguration;
- this.keys = Lists.newArrayList();
- for( MoreoverKeyData apiKey : config.getApiKeys()) {
- this.keys.add(apiKey);
- }
- }
-
- @Override
- public String getId() {
- return STREAMS_ID;
- }
-
- public void startStream() {
-
- for(MoreoverKeyData key : keys) {
- MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
- executor.submit(new Thread(task));
- LOGGER.info("Started producer for {}", key.getKey());
- }
-
- }
-
- @Override
- public synchronized StreamsResultSet readCurrent() {
-
- LOGGER.debug("readCurrent: {}", providerQueue.size());
-
- Collection<StreamsDatum> currentIterator = Lists.newArrayList();
- Iterators.addAll(currentIterator, providerQueue.iterator());
-
- StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
-
- providerQueue.clear();
-
- return current;
- }
-
- @Override
- public StreamsResultSet readNew(BigInteger sequence) {
- return null;
- }
-
- @Override
- public StreamsResultSet readRange(DateTime start, DateTime end) {
- return null;
- }
-
- @Override
- public boolean isRunning() {
- return !executor.isShutdown() && !executor.isTerminated();
- }
-
- @Override
- public void prepare(Object configurationObject) {
- LOGGER.debug("Prepare");
- executor = Executors.newSingleThreadExecutor();
- }
-
- @Override
- public void cleanUp() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
deleted file mode 100644
index 2640bfd..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.math.BigInteger;
-import java.util.Queue;
-
-/**
- * Task to pull from the Morever API
- */
-public class MoreoverProviderTask implements Runnable {
-
- public static final int LATENCY = 10;
- public static final int REQUIRED_LATENCY = LATENCY * 1000;
- private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
-
- private String lastSequence;
- private final String apiKey;
- private final String apiId;
- private final Queue<StreamsDatum> results;
- private final MoreoverClient moClient;
- private boolean started = false;
-
- public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) {
- //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
- this.apiId = apiId;
- this.apiKey = apiKey;
- this.results = results;
- this.lastSequence = lastSequence;
- this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
- initializeClient(moClient);
- }
-
- @Override
- public void run() {
- while(true) {
- try {
- ensureTime(moClient);
- MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
- started = true;
- lastSequence = result.process().toString();
- for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
- results.offer(entry);
- logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
-
- } catch (Exception e) {
- logger.error("Exception while polling moreover", e);
- }
- }
- }
-
- private void ensureTime(MoreoverClient moClient) {
- try {
- long gap = System.currentTimeMillis() - moClient.pullTime;
- if (gap < REQUIRED_LATENCY)
- Thread.sleep(REQUIRED_LATENCY - gap);
- } catch (Exception e) {
- logger.warn("Error sleeping for latency");
- }
- }
-
- private void initializeClient(MoreoverClient moClient) {
- try {
- moClient.getArticlesAfter(this.lastSequence, 2);
- } catch (Exception e) {
- logger.error("Failed to start stream, {}", this.apiKey);
- logger.error("Exception : ", e);
- throw new IllegalStateException("Unable to initialize stream", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
deleted file mode 100644
index 050026b..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import com.fasterxml.aalto.stax.InputFactoryImpl;
-import com.fasterxml.aalto.stax.OutputFactoryImpl;
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
-import com.fasterxml.jackson.dataformat.xml.XmlFactory;
-import com.fasterxml.jackson.dataformat.xml.XmlMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
-import com.moreover.api.Article;
-import com.moreover.api.ArticlesResponse;
-import org.apache.streams.core.StreamsDatum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.List;
-
-
-public class MoreoverResult implements Iterable<StreamsDatum> {
-
- private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
-
- private ObjectMapper mapper;
- private XmlMapper xmlMapper;
-
- private String xmlString;
- private String jsonString;
- private ArticlesResponse resultObject;
- private ArticlesResponse.Articles articles;
- private List<Article> articleArray;
- private long start;
- private long end;
- private String clientId;
- private BigInteger maxSequencedId = BigInteger.ZERO;
-
- protected ArticlesResponse response;
- protected List<StreamsDatum> list = Lists.newArrayList();
-
- protected MoreoverResult(String clientId, String xmlString, long start, long end) {
- this.xmlString = xmlString;
- this.clientId = clientId;
- this.start = start;
- this.end = end;
- XmlFactory f = new XmlFactory(new InputFactoryImpl(),
- new OutputFactoryImpl());
-
- JacksonXmlModule module = new JacksonXmlModule();
-
- module.setDefaultUseWrapper(false);
-
- xmlMapper = new XmlMapper(f, module);
-
- xmlMapper
- .configure(
- DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
- Boolean.TRUE);
- xmlMapper
- .configure(
- DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
- Boolean.TRUE);
- xmlMapper
- .configure(
- DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
- Boolean.TRUE);
- xmlMapper.configure(
- DeserializationFeature.READ_ENUMS_USING_TO_STRING,
- Boolean.TRUE);
- xmlMapper.configure(
- DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
- Boolean.FALSE);
-
- mapper = new ObjectMapper();
-
- mapper
- .configure(
- DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
- Boolean.TRUE);
- mapper.configure(
- DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
- Boolean.TRUE);
- mapper
- .configure(
- DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
- Boolean.TRUE);
- mapper.configure(
- DeserializationFeature.READ_ENUMS_USING_TO_STRING,
- Boolean.TRUE);
-
- }
-
- public String getClientId() {
- return clientId;
- }
-
- public long getStart() {
- return start;
- }
-
- public long getEnd() {
- return end;
- }
-
- public BigInteger process() {
-
- try {
- this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
- } catch (JsonMappingException e) {
- // theory is this may not be fatal
- this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
- } catch (Exception e) {
- e.printStackTrace();
- logger.warn("Unable to process document:");
- logger.warn(xmlString);
- }
-
- if( this.resultObject.getStatus().equals("FAILURE"))
- {
- logger.warn(this.resultObject.getStatus());
- logger.warn(this.resultObject.getMessageCode());
- logger.warn(this.resultObject.getUserMessage());
- logger.warn(this.resultObject.getDeveloperMessage());
- }
- else
- {
- this.articles = resultObject.getArticles();
- this.articleArray = articles.getArticle();
-
- for (Article article : articleArray) {
- BigInteger sequenceid = new BigInteger(article.getSequenceId());
- list.add(new StreamsDatum(article, sequenceid));
- logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
- if (sequenceid.compareTo(this.maxSequencedId) > 0) {
- this.maxSequencedId = sequenceid;
- }
- }
- }
-
- return this.maxSequencedId;
- }
-
- public String getXmlString() {
- return this.xmlString;
- }
-
- public BigInteger getMaxSequencedId() {
- return this.maxSequencedId;
- }
-
- @Override
- public Iterator<StreamsDatum> iterator() {
- return list.iterator();
- }
-
- protected static class JsonStringIterator implements Iterator<Serializable> {
-
- private Iterator<Serializable> underlying;
-
- protected JsonStringIterator(Iterator<Serializable> underlying) {
- this.underlying = underlying;
- }
-
- @Override
- public boolean hasNext() {
- return underlying.hasNext();
- }
-
- @Override
- public String next() {
- return underlying.next().toString();
- }
-
- @Override
- public void remove() {
- underlying.remove();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
deleted file mode 100644
index b0f95a4..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResultSetWrapper.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.moreover;
-
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsResultSet;
-
-import java.math.BigInteger;
-import java.util.Iterator;
-import java.util.Queue;
-
-public class MoreoverResultSetWrapper extends StreamsResultSet {
-
- public MoreoverResultSetWrapper(MoreoverResult underlying) {
- super((Queue<StreamsDatum>)underlying);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
deleted file mode 100644
index 0424875..0000000
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/util/MoreoverUtils.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.data.util;
-
-import com.moreover.api.*;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.apache.streams.pojo.json.Actor;
-import org.apache.streams.pojo.json.Provider;
-import org.joda.time.DateTime;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-
-import static org.apache.streams.data.util.ActivityUtil.*;
-
-/**
- * Provides utilities for Moroever data
- */
-public class MoreoverUtils {
- private MoreoverUtils() {
- }
-
- public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
-
- public static Activity convert(Article article) {
- Activity activity = new Activity();
- Source source = article.getSource();
- activity.setActor(convert(article.getAuthor(), source.getName()));
- activity.setProvider(convert(source));
- activity.setTarget(convertTarget(source));
- activity.setObject(convertObject(article));
- activity.setPublished(DateTime.parse(article.getPublishedDate()));
- activity.setContent(article.getContent());
- activity.setTitle(article.getTitle());
- activity.setVerb("posted");
- fixActivityId(activity);
- addLocationExtension(activity, source);
- addLanguageExtension(activity, article);
- activity.setLinks(convertLinks(article));
- return activity;
- }
-
- private static void fixActivityId(Activity activity) {
- if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) {
- activity.setId(null);
- }
- }
-
- private static List convertLinks(Article article) {
- List list = new LinkedList();
- Article.OutboundUrls outboundUrls = article.getOutboundUrls();
- if (outboundUrls != null) {
- for (String url : outboundUrls.getOutboundUrl()) {
- list.add(url);
- }
- }
- return list;
- }
-
- public static ActivityObject convertTarget(Source source) {
- ActivityObject object = new ActivityObject();
- object.setUrl(source.getHomeUrl());
- object.setDisplayName(source.getName());
- return object;
- }
-
- public static ActivityObject convertObject(Article article) {
- ActivityObject object = new ActivityObject();
- object.setContent(article.getContent());
- object.setSummary(article.getTitle());
- object.setUrl(article.getOriginalUrl());
- object.setObjectType(article.getDataFormat());
- String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat();
- object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId()));
- object.setPublished(DateTime.parse(article.getPublishedDate()));
- return object;
- }
-
- public static Provider convert(Source source) {
- Provider provider = new Provider();
- Feed feed = source.getFeed();
- String display = getProviderID(feed);
- provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_")));
- provider.setDisplayName(display);
- provider.setUrl(feed.getUrl());
- return provider;
- }
-
- public static Actor convert(Author author, String platformName) {
- Actor actor = new Actor();
- AuthorPublishingPlatform platform = author.getPublishingPlatform();
- String userId = platform.getUserId();
- if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId));
- actor.setDisplayName(author.getName());
- actor.setUrl(author.getHomeUrl());
- actor.setSummary(author.getDescription());
- actor.setAdditionalProperty("email", author.getEmail());
- return actor;
- }
-
- public static void addLocationExtension(Activity activity, Source value) {
- Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
- String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode();
- if (country != null) {
- Map<String, Object> location = new HashMap<String, Object>();
- location.put(LOCATION_EXTENSION_COUNTRY, country);
- extensions.put(LOCATION_EXTENSION, location);
- }
- }
-
- public static void addLanguageExtension(Activity activity, Article value) {
- Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
- String language = value.getLanguage();
- if (language != null) {
- extensions.put(LANGUAGE_EXTENSION, language);
- }
- }
-
- public static Date parse(String str) {
- DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
- try {
- return fmt.parse(str);
- } catch (ParseException e) {
- throw new IllegalArgumentException("Invalid date format", e);
- }
- }
-
- private static String getProviderID(Feed feed) {
- return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform());
- }
-
- private static String getProviderID(String feed) {
- return feed.toLowerCase().replace(" ", "_").trim();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
new file mode 100644
index 0000000..05e6120
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverClient.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.commons.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.util.Date;
+
+/**
+ *
+ */
+public class MoreoverClient {
+ private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
+
+ private static final String BASE_URL = "http://metabase.moreover.com/api/v10/articles?key=%s&limit=%s&sequence_id=%s";
+ private final String id;
+ private String apiKey;
+ private BigInteger lastSequenceId = BigInteger.ZERO;
+ //testing purpose only
+ public long pullTime;
+ private boolean debug;
+
+ public MoreoverClient(String id, String apiKey, String sequence) {
+ logger.info("Constructed new client for id:{} key:{} sequence:{}", id, apiKey, sequence);
+ this.id = id;
+ this.apiKey = apiKey;
+ this.lastSequenceId = new BigInteger(sequence);
+ }
+
+ public MoreoverResult getArticlesAfter(String sequenceId, int limit) throws IOException {
+ String urlString = String.format(BASE_URL, this.apiKey, limit, sequenceId);
+ logger.debug("Making call to {}", urlString);
+ long start = System.nanoTime();
+ MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
+ if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
+ {
+ this.lastSequenceId = result.getMaxSequencedId();
+ logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
+ }
+ else
+ logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
+ return result;
+ }
+
+ public MoreoverResult getNextBatch() throws IOException{
+ logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
+ return getArticlesAfter(this.lastSequenceId.toString(), 500);
+ }
+
+ private String getArticles2(URL url) throws IOException {
+ HttpURLConnection cn = (HttpURLConnection) url.openConnection();
+ cn.setRequestMethod("GET");
+ cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
+ cn.setDoInput(true);
+ cn.setDoOutput(false);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")));
+ String line = null;
+ StringBuilder builder = new StringBuilder();
+ String s = "";
+ String result = new String(s.getBytes(Charset.forName("UTF-8")), Charset.forName("UTF-8"));
+ while((line = reader.readLine()) != null) {
+ result+=line;
+ }
+ pullTime = new Date().getTime();
+ return result;
+ }
+
+ private String getArticles(URL url) throws IOException{
+ HttpURLConnection cn = (HttpURLConnection) url.openConnection();
+ cn.setRequestMethod("GET");
+ cn.addRequestProperty("Content-Type", "text/xml;charset=UTF-8");
+ cn.setDoInput(true);
+ cn.setDoOutput(false);
+ StringWriter writer = new StringWriter();
+ IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
+ writer.flush();
+ pullTime = new Date().getTime();
+
+ // added after seeing java.net.SocketException: Too many open files
+ cn.disconnect();
+
+ return writer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
new file mode 100644
index 0000000..8d89582
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class MoreoverConfigurator {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverConfigurator.class);
+
+ public static MoreoverConfiguration detectConfiguration(Config moreover) {
+
+ MoreoverConfiguration moreoverConfiguration = new MoreoverConfiguration();
+
+ List<MoreoverKeyData> apiKeys = Lists.newArrayList();
+
+ Config apiKeysConfig = moreover.getConfig("apiKeys");
+
+ if( !apiKeysConfig.isEmpty())
+ for( String apiKeyId : apiKeysConfig.root().keySet() ) {
+ Config apiKeyConfig = apiKeysConfig.getConfig(apiKeyId);
+ apiKeys.add(new MoreoverKeyData()
+ .withId(apiKeyConfig.getString("key"))
+ .withKey(apiKeyConfig.getString("key"))
+ .withStartingSequence(apiKeyConfig.getString("startingSequence")));
+ }
+ moreoverConfiguration.setApiKeys(apiKeys);
+
+ return moreoverConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
new file mode 100644
index 0000000..17fde37
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverJsonActivitySerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.jackson.databind.AnnotationIntrospector;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
+import com.moreover.api.Article;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Deserializes Moreover JSON format into Activities
+ */
+public class MoreoverJsonActivitySerializer implements ActivitySerializer<String> {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverJsonActivitySerializer.class);
+
+ public MoreoverJsonActivitySerializer() {
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "application/json+vnd.moreover.com.v1";
+ }
+
+ @Override
+ public String serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Moreover JSON");
+ }
+
+ @Override
+ public Activity deserialize(String serialized) {
+ serialized = serialized.replaceAll("\\[[ ]*\\]", "null");
+
+ LOGGER.debug(serialized);
+
+ ObjectMapper mapper = new ObjectMapper();
+ AnnotationIntrospector introspector = new JaxbAnnotationIntrospector(mapper.getTypeFactory());
+ mapper.setAnnotationIntrospector(introspector);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, Boolean.FALSE);
+ mapper.configure(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE, Boolean.FALSE);
+ mapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT, Boolean.TRUE);
+ mapper.configure(DeserializationFeature.WRAP_EXCEPTIONS, Boolean.TRUE);
+
+ Article article;
+ try {
+ ObjectNode node = (ObjectNode)mapper.readTree(serialized);
+ node.remove("tags");
+ node.remove("locations");
+ node.remove("companies");
+ node.remove("topics");
+ node.remove("media");
+ node.remove("outboundUrls");
+ ObjectNode jsonNodes = (ObjectNode) node.get("source").get("feed");
+ jsonNodes.remove("editorialTopics");
+ jsonNodes.remove("tags");
+ jsonNodes.remove("autoTopics");
+ article = mapper.convertValue(node, Article.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Unable to deserialize", e);
+ }
+ return MoreoverUtils.convert(article);
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ throw new NotImplementedException("Not currently implemented");
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
new file mode 100644
index 0000000..78d8e9d
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProvider.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.*;
+import com.google.common.util.concurrent.Uninterruptibles;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfiguration;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProvider;
+import org.apache.streams.core.StreamsResultSet;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Streams Provider for the Moreover Metabase API
+ *
+ * To use from command line:
+ *
+ * Supply configuration similar to src/test/resources/rss.conf
+ *
+ * Launch using:
+ *
+ * mvn exec:java -Dexec.mainClass=org.apache.streams.moreover.MoreoverProvider -Dexec.args="rss.conf articles.json"
+ */
+public class MoreoverProvider implements StreamsProvider {
+
+ public final static String STREAMS_ID = "MoreoverProvider";
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(MoreoverProvider.class);
+
+ protected volatile Queue<StreamsDatum> providerQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+
+ private List<MoreoverKeyData> keys;
+
+ private MoreoverConfiguration config;
+
+ private ExecutorService executor;
+
+ public MoreoverProvider(MoreoverConfiguration moreoverConfiguration) {
+ this.config = moreoverConfiguration;
+ this.keys = Lists.newArrayList();
+ for( MoreoverKeyData apiKey : config.getApiKeys()) {
+ this.keys.add(apiKey);
+ }
+ }
+
+ @Override
+ public String getId() {
+ return STREAMS_ID;
+ }
+
+ public void startStream() {
+
+ for(MoreoverKeyData key : keys) {
+ MoreoverProviderTask task = new MoreoverProviderTask(key.getId(), key.getKey(), this.providerQueue, key.getStartingSequence());
+ executor.submit(new Thread(task));
+ LOGGER.info("Started producer for {}", key.getKey());
+ }
+
+ }
+
+ @Override
+ public synchronized StreamsResultSet readCurrent() {
+
+ LOGGER.debug("readCurrent: {}", providerQueue.size());
+
+ Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+ Iterators.addAll(currentIterator, providerQueue.iterator());
+
+ StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
+
+ providerQueue.clear();
+
+ return current;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ @Override
+ public boolean isRunning() {
+ return !executor.isShutdown() && !executor.isTerminated();
+ }
+
+ @Override
+ public void prepare(Object configurationObject) {
+ LOGGER.debug("Prepare");
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Override
+ public void cleanUp() {
+
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ Preconditions.checkArgument(args.length >= 2);
+
+ String configfile = args[0];
+ String outfile = args[1];
+
+ Config reference = ConfigFactory.load();
+ File conf_file = new File(configfile);
+ assert(conf_file.exists());
+ Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false));
+
+ Config typesafe = testResourceConfig.withFallback(reference).resolve();
+
+ StreamsConfiguration streamsConfiguration = StreamsConfigurator.detectConfiguration(typesafe);
+ MoreoverConfiguration config = new ComponentConfigurator<>(MoreoverConfiguration.class).detectConfiguration(typesafe, "rss");
+ MoreoverProvider provider = new MoreoverProvider(config);
+
+ ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+ PrintStream outStream = new PrintStream(new BufferedOutputStream(new FileOutputStream(outfile)));
+ provider.prepare(config);
+ provider.startStream();
+ do {
+ Uninterruptibles.sleepUninterruptibly(streamsConfiguration.getBatchFrequencyMs(), TimeUnit.MILLISECONDS);
+ Iterator<StreamsDatum> iterator = provider.readCurrent().iterator();
+ while(iterator.hasNext()) {
+ StreamsDatum datum = iterator.next();
+ String json;
+ try {
+ json = mapper.writeValueAsString(datum.getDocument());
+ outStream.println(json);
+ } catch (JsonProcessingException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ } while( provider.isRunning());
+ provider.cleanUp();
+ outStream.flush();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
new file mode 100644
index 0000000..ad92d73
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverProviderTask.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Queue;
+
+/**
+ * Task to pull from the Morever API
+ */
+public class MoreoverProviderTask implements Runnable {
+
+ public static final int LATENCY = 10;
+ public static final int REQUIRED_LATENCY = LATENCY * 1000;
+ private static Logger logger = LoggerFactory.getLogger(MoreoverProviderTask.class);
+
+ private String lastSequence;
+ private final String apiKey;
+ private final String apiId;
+ private final Queue<StreamsDatum> results;
+ private final MoreoverClient moClient;
+ private boolean started = false;
+
+ public MoreoverProviderTask(String apiId, String apiKey, Queue<StreamsDatum> results, String lastSequence) {
+ //logger.info("Constructed new task {} for {} {} {}", UUID.randomUUID().toString(), apiId, apiKey, lastSequence);
+ this.apiId = apiId;
+ this.apiKey = apiKey;
+ this.results = results;
+ this.lastSequence = lastSequence;
+ this.moClient = new MoreoverClient(this.apiId, this.apiKey, this.lastSequence);
+ initializeClient(moClient);
+ }
+
+ @Override
+ public void run() {
+ while(true) {
+ try {
+ ensureTime(moClient);
+ MoreoverResult result = moClient.getArticlesAfter(lastSequence, 500);
+ started = true;
+ lastSequence = result.process().toString();
+ for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
+ results.offer(entry);
+ logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, lastSequence);
+
+ } catch (Exception e) {
+ logger.error("Exception while polling moreover", e);
+ }
+ }
+ }
+
+ private void ensureTime(MoreoverClient moClient) {
+ try {
+ long gap = System.currentTimeMillis() - moClient.pullTime;
+ if (gap < REQUIRED_LATENCY)
+ Thread.sleep(REQUIRED_LATENCY - gap);
+ } catch (Exception e) {
+ logger.warn("Error sleeping for latency");
+ }
+ }
+
+ private void initializeClient(MoreoverClient moClient) {
+ try {
+ moClient.getArticlesAfter(this.lastSequence, 2);
+ } catch (Exception e) {
+ logger.error("Failed to start stream, {}", this.apiKey);
+ logger.error("Exception : ", e);
+ throw new IllegalStateException("Unable to initialize stream", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
new file mode 100644
index 0000000..e07084f
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResult.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.fasterxml.aalto.stax.InputFactoryImpl;
+import com.fasterxml.aalto.stax.OutputFactoryImpl;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.xml.JacksonXmlModule;
+import com.fasterxml.jackson.dataformat.xml.XmlFactory;
+import com.fasterxml.jackson.dataformat.xml.XmlMapper;
+import com.google.common.collect.Lists;
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Iterator;
+import java.util.List;
+
+
+public class MoreoverResult implements Iterable<StreamsDatum> {
+
+ private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
+
+ private ObjectMapper mapper;
+ private XmlMapper xmlMapper;
+
+ private String xmlString;
+ private String jsonString;
+ private ArticlesResponse resultObject;
+ private ArticlesResponse.Articles articles;
+ private List<Article> articleArray;
+ private long start;
+ private long end;
+ private String clientId;
+ private BigInteger maxSequencedId = BigInteger.ZERO;
+
+ protected ArticlesResponse response;
+ protected List<StreamsDatum> list = Lists.newArrayList();
+
+ protected MoreoverResult(String clientId, String xmlString, long start, long end) {
+ this.xmlString = xmlString;
+ this.clientId = clientId;
+ this.start = start;
+ this.end = end;
+ XmlFactory f = new XmlFactory(new InputFactoryImpl(),
+ new OutputFactoryImpl());
+
+ JacksonXmlModule module = new JacksonXmlModule();
+
+ module.setDefaultUseWrapper(false);
+
+ xmlMapper = new XmlMapper(f, module);
+
+ xmlMapper
+ .configure(
+ DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+ Boolean.TRUE);
+ xmlMapper
+ .configure(
+ DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+ Boolean.TRUE);
+ xmlMapper
+ .configure(
+ DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+ Boolean.TRUE);
+ xmlMapper.configure(
+ DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+ Boolean.TRUE);
+ xmlMapper.configure(
+ DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ Boolean.FALSE);
+
+ mapper = new ObjectMapper();
+
+ mapper
+ .configure(
+ DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+ Boolean.TRUE);
+ mapper.configure(
+ DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+ Boolean.TRUE);
+ mapper
+ .configure(
+ DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+ Boolean.TRUE);
+ mapper.configure(
+ DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+ Boolean.TRUE);
+
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public BigInteger process() {
+
+ try {
+ this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
+ } catch (JsonMappingException e) {
+ // theory is this may not be fatal
+ this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
+ } catch (Exception e) {
+ e.printStackTrace();
+ logger.warn("Unable to process document:");
+ logger.warn(xmlString);
+ }
+
+ if( this.resultObject.getStatus().equals("FAILURE"))
+ {
+ logger.warn(this.resultObject.getStatus());
+ logger.warn(this.resultObject.getMessageCode());
+ logger.warn(this.resultObject.getUserMessage());
+ logger.warn(this.resultObject.getDeveloperMessage());
+ }
+ else
+ {
+ this.articles = resultObject.getArticles();
+ this.articleArray = articles.getArticle();
+
+ for (Article article : articleArray) {
+ BigInteger sequenceid = new BigInteger(article.getSequenceId());
+ list.add(new StreamsDatum(article, sequenceid));
+ logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
+ if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+ this.maxSequencedId = sequenceid;
+ }
+ }
+ }
+
+ return this.maxSequencedId;
+ }
+
+ public String getXmlString() {
+ return this.xmlString;
+ }
+
+ public BigInteger getMaxSequencedId() {
+ return this.maxSequencedId;
+ }
+
+ @Override
+ public Iterator<StreamsDatum> iterator() {
+ return list.iterator();
+ }
+
+ protected static class JsonStringIterator implements Iterator<Serializable> {
+
+ private Iterator<Serializable> underlying;
+
+ protected JsonStringIterator(Iterator<Serializable> underlying) {
+ this.underlying = underlying;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return underlying.hasNext();
+ }
+
+ @Override
+ public String next() {
+ return underlying.next().toString();
+ }
+
+ @Override
+ public void remove() {
+ underlying.remove();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
new file mode 100644
index 0000000..0a47bd1
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverResultSetWrapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsResultSet;
+
+import java.util.Queue;
+
+public class MoreoverResultSetWrapper extends StreamsResultSet {
+
+ public MoreoverResultSetWrapper(MoreoverResult underlying) {
+ super((Queue<StreamsDatum>)underlying);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
new file mode 100644
index 0000000..45d26df
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverUtils.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.moreover.api.*;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+import org.apache.streams.pojo.json.Actor;
+import org.apache.streams.pojo.json.Provider;
+import org.joda.time.DateTime;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import static org.apache.streams.data.util.ActivityUtil.*;
+
+/**
+ * Provides utilities for Moroever data
+ */
+public class MoreoverUtils {
+ private MoreoverUtils() {
+ }
+
+ public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss'Z'";
+
+ public static Activity convert(Article article) {
+ Activity activity = new Activity();
+ Source source = article.getSource();
+ activity.setActor(convert(article.getAuthor(), source.getName()));
+ activity.setProvider(convert(source));
+ activity.setTarget(convertTarget(source));
+ activity.setObject(convertObject(article));
+ activity.setPublished(DateTime.parse(article.getPublishedDate()));
+ activity.setContent(article.getContent());
+ activity.setTitle(article.getTitle());
+ activity.setVerb("posted");
+ fixActivityId(activity);
+ addLocationExtension(activity, source);
+ addLanguageExtension(activity, article);
+ activity.setLinks(convertLinks(article));
+ return activity;
+ }
+
+ private static void fixActivityId(Activity activity) {
+ if (activity.getId() != null && activity.getId().matches("\\{[a-z]*\\}")) {
+ activity.setId(null);
+ }
+ }
+
+ private static List convertLinks(Article article) {
+ List list = new LinkedList();
+ Article.OutboundUrls outboundUrls = article.getOutboundUrls();
+ if (outboundUrls != null) {
+ for (String url : outboundUrls.getOutboundUrl()) {
+ list.add(url);
+ }
+ }
+ return list;
+ }
+
+ public static ActivityObject convertTarget(Source source) {
+ ActivityObject object = new ActivityObject();
+ object.setUrl(source.getHomeUrl());
+ object.setDisplayName(source.getName());
+ return object;
+ }
+
+ public static ActivityObject convertObject(Article article) {
+ ActivityObject object = new ActivityObject();
+ object.setContent(article.getContent());
+ object.setSummary(article.getTitle());
+ object.setUrl(article.getOriginalUrl());
+ object.setObjectType(article.getDataFormat());
+ String type = article.getDataFormat().equals("text") ? "article" : article.getDataFormat();
+ object.setId(getObjectId(getProviderID(article.getSource().getFeed()), type, article.getId()));
+ object.setPublished(DateTime.parse(article.getPublishedDate()));
+ return object;
+ }
+
+ public static Provider convert(Source source) {
+ Provider provider = new Provider();
+ Feed feed = source.getFeed();
+ String display = getProviderID(feed);
+ provider.setId(getProviderId(display.trim().toLowerCase().replace(" ", "_")));
+ provider.setDisplayName(display);
+ provider.setUrl(feed.getUrl());
+ return provider;
+ }
+
+ public static Actor convert(Author author, String platformName) {
+ Actor actor = new Actor();
+ AuthorPublishingPlatform platform = author.getPublishingPlatform();
+ String userId = platform.getUserId();
+ if (userId != null) actor.setId(getPersonId(getProviderID(platformName), userId));
+ actor.setDisplayName(author.getName());
+ actor.setUrl(author.getHomeUrl());
+ actor.setSummary(author.getDescription());
+ actor.setAdditionalProperty("email", author.getEmail());
+ return actor;
+ }
+
+ public static void addLocationExtension(Activity activity, Source value) {
+ Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+ String country = value.getLocation().getCountryCode() == null ? value.getLocation().getCountry() : value.getLocation().getCountryCode();
+ if (country != null) {
+ Map<String, Object> location = new HashMap<String, Object>();
+ location.put(LOCATION_EXTENSION_COUNTRY, country);
+ extensions.put(LOCATION_EXTENSION, location);
+ }
+ }
+
+ public static void addLanguageExtension(Activity activity, Article value) {
+ Map<String, Object> extensions = ExtensionUtil.getInstance().ensureExtensions(activity);
+ String language = value.getLanguage();
+ if (language != null) {
+ extensions.put(LANGUAGE_EXTENSION, language);
+ }
+ }
+
+ public static Date parse(String str) {
+ DateFormat fmt = new SimpleDateFormat(DATE_FORMAT);
+ try {
+ return fmt.parse(str);
+ } catch (ParseException e) {
+ throw new IllegalArgumentException("Invalid date format", e);
+ }
+ }
+
+ private static String getProviderID(Feed feed) {
+ return getProviderID(feed.getPublishingPlatform() == null ? feed.getMediaType() : feed.getPublishingPlatform());
+ }
+
+ private static String getProviderID(String feed) {
+ return feed.toLowerCase().replace(" ", "_").trim();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/015f0ae1/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
new file mode 100644
index 0000000..4b7b3b0
--- /dev/null
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverXmlActivitySerializer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.moreover;
+
+import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
+import com.moreover.api.ObjectFactory;
+import org.apache.commons.lang.SerializationException;
+import org.apache.streams.data.ActivitySerializer;
+import org.apache.streams.moreover.MoreoverUtils;
+import org.apache.streams.pojo.json.Activity;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.StringReader;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Deserializes the Moreover Article XML and converts it to an instance of {@link Activity}
+ */
+public class MoreoverXmlActivitySerializer implements ActivitySerializer<String> {
+
+ //JAXBContext is threadsafe (supposedly)
+ private final JAXBContext articleContext;
+ private final JAXBContext articlesContext;
+
+ public MoreoverXmlActivitySerializer() {
+ articleContext = createContext(Article.class);
+ articlesContext = createContext(ArticlesResponse.class);
+ }
+
+ @Override
+ public String serializationFormat() {
+ return "application/xml+vnd.moreover.com.v1";
+ }
+
+ @Override
+ public String serialize(Activity deserialized) {
+ throw new UnsupportedOperationException("Cannot currently serialize to Moreover");
+ }
+
+ @Override
+ public Activity deserialize(String serialized) {
+ Article article = deserializeMoreover(serialized);
+ return MoreoverUtils.convert(article);
+ }
+
+ @Override
+ public List<Activity> deserializeAll(List<String> serializedList) {
+ List<Activity> activities = new LinkedList<Activity>();
+ for(String item : serializedList) {
+ ArticlesResponse response = deserializeMoreoverResponse(item);
+ for(Article article : response.getArticles().getArticle()) {
+ activities.add(MoreoverUtils.convert(article));
+ }
+ }
+ return activities;
+ }
+
+ private Article deserializeMoreover(String serialized){
+ try {
+ Unmarshaller unmarshaller = articleContext.createUnmarshaller();
+ return (Article) unmarshaller.unmarshal(new StringReader(serialized));
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to deserialize Moreover data", e);
+ }
+ }
+
+ private ArticlesResponse deserializeMoreoverResponse(String serialized){
+ try {
+ Unmarshaller unmarshaller = articlesContext.createUnmarshaller();
+ return ((JAXBElement<ArticlesResponse>) unmarshaller.unmarshal(new StringReader(serialized))).getValue();
+ } catch (JAXBException e) {
+ throw new SerializationException("Unable to deserialize Moreover data", e);
+ }
+ }
+
+ private JAXBContext createContext(Class articleClass) {
+ JAXBContext context;
+ try {
+ context = JAXBContext.newInstance(articleClass.getPackage().getName(), ObjectFactory.class.getClassLoader());
+ } catch (JAXBException e) {
+ throw new IllegalStateException("Unable to create JAXB Context for Moreover data", e);
+ }
+ return context;
+ }
+}
[3/3] incubator-streams git commit: Merge branch '0.4-moreover'
Posted by sb...@apache.org.
Merge branch '0.4-moreover'
# Conflicts:
# streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/moreover/MoreoverConfigurator.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/67d5cca5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/67d5cca5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/67d5cca5
Branch: refs/heads/master
Commit: 67d5cca51ee250b5dea0012cd9576157b19e57ec
Parents: 9861124 015f0ae
Author: Steve Blackmon @steveblackmon <sb...@apache.org>
Authored: Sat Oct 22 14:44:38 2016 -0500
Committer: Steve Blackmon @steveblackmon <sb...@apache.org>
Committed: Sat Oct 22 14:44:38 2016 -0500
----------------------------------------------------------------------
.../data/MoreoverJsonActivitySerializer.java | 97 ---------
.../data/MoreoverXmlActivitySerializer.java | 105 ----------
.../streams/data/moreover/MoreoverClient.java | 110 ----------
.../streams/data/moreover/MoreoverProvider.java | 114 -----------
.../data/moreover/MoreoverProviderTask.java | 94 ---------
.../streams/data/moreover/MoreoverResult.java | 204 -------------------
.../data/moreover/MoreoverResultSetWrapper.java | 34 ----
.../apache/streams/data/util/MoreoverUtils.java | 155 --------------
.../apache/streams/moreover/MoreoverClient.java | 110 ++++++++++
.../MoreoverJsonActivitySerializer.java | 98 +++++++++
.../streams/moreover/MoreoverProvider.java | 177 ++++++++++++++++
.../streams/moreover/MoreoverProviderTask.java | 91 +++++++++
.../apache/streams/moreover/MoreoverResult.java | 200 ++++++++++++++++++
.../moreover/MoreoverResultSetWrapper.java | 32 +++
.../apache/streams/moreover/MoreoverUtils.java | 155 ++++++++++++++
.../moreover/MoreoverXmlActivitySerializer.java | 106 ++++++++++
.../src/main/resources/moreover.conf | 25 ---
.../data/MoreoverJsonActivitySerializerIT.java | 79 -------
.../data/MoreoverXmlActivitySerializerIT.java | 65 ------
.../streams/data/util/MoreoverTestUtil.java | 43 ----
.../streams/moreover/MoreoverTestUtil.java | 43 ++++
.../test/MoreoverJsonActivitySerializerIT.java | 76 +++++++
.../test/MoreoverXmlActivitySerializerIT.java | 66 ++++++
.../test/provider/MoreoverProviderIT.java | 67 ++++++
.../src/test/resources/moreover.conf | 25 +++
25 files changed, 1246 insertions(+), 1125 deletions(-)
----------------------------------------------------------------------