You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/28 08:23:24 UTC
svn commit: r1572843 - in
/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover:
MoreoverClient.java MoreoverProvider.java MoreoverProviderTask.java
MoreoverResult.java
Author: sblackmon
Date: Fri Feb 28 07:23:23 2014
New Revision: 1572843
URL: http://svn.apache.org/r1572843
Log:
Made some tweaks and now provider is running single-threaded
Modified:
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java Fri Feb 28 07:23:23 2014
@@ -46,7 +46,7 @@ public class MoreoverClient {
}
public MoreoverResult getNextBatch() throws IOException{
- //logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
+ logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
return getArticlesAfter(this.lastSequenceId.toString(), 500);
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java Fri Feb 28 07:23:23 2014
@@ -1,6 +1,8 @@
package org.apache.streams.data.moreover;
-import com.google.common.collect.Lists;
+import com.google.common.base.Predicates;
+import com.google.common.collect.*;
+import net.jcip.annotations.Immutable;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsProvider;
import org.apache.streams.core.StreamsResultSet;
@@ -11,10 +13,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
import java.util.concurrent.*;
public class MoreoverProvider implements StreamsProvider {
@@ -50,16 +49,21 @@ public class MoreoverProvider implements
}
@Override
- public StreamsResultSet readCurrent() {
+ public synchronized StreamsResultSet readCurrent() {
LOGGER.debug("readCurrent");
LOGGER.info("Providing {} docs", providerQueue.size());
- StreamsResultSet result = new StreamsResultSet(providerQueue);
+ Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+ Iterators.addAll(currentIterator, providerQueue.iterator());
+
+ StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
LOGGER.info("Exiting");
- return result;
+ providerQueue.clear();
+
+ return current;
}
@Override
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java Fri Feb 28 07:23:23 2014
@@ -37,15 +37,18 @@ public class MoreoverProviderTask implem
@Override
public void run() {
- try {
- ensureTime(moClient);
- MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
- started = true;
- for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
- results.offer(entry);
- logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
- } catch (Exception e) {
- logger.error("Exception while polling moreover", e);
+ while(true) {
+ try {
+ ensureTime(moClient);
+ MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+ started = true;
+ result.process();
+ for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
+ results.offer(entry);
+ logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+ } catch (Exception e) {
+ logger.error("Exception while polling moreover", e);
+ }
}
}
Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java Fri Feb 28 07:23:23 2014
@@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
import org.apache.streams.core.StreamsDatum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,21 +26,23 @@ import java.util.List;
public class MoreoverResult implements Iterable<StreamsDatum> {
- private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
+ private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
private ObjectMapper mapper;
private XmlMapper xmlMapper;
private String xmlString;
private String jsonString;
- private ObjectNode resultObject;
- private JsonNode articlesArray;
+ private ArticlesResponse resultObject;
+ private ArticlesResponse.Articles articles;
+ private List<Article> articleArray;
private long start;
private long end;
private String clientId;
private BigInteger maxSequencedId = BigInteger.ZERO;
- private List<StreamsDatum> list = Lists.newArrayList();
+ protected ArticlesResponse response;
+ protected List<StreamsDatum> list = Lists.newArrayList();
protected MoreoverResult(String clientId, String xmlString, long start, long end) {
this.xmlString = xmlString;
@@ -70,6 +73,26 @@ public class MoreoverResult implements I
xmlMapper.configure(
DeserializationFeature.READ_ENUMS_USING_TO_STRING,
Boolean.TRUE);
+ xmlMapper.configure(
+ DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+ Boolean.FALSE);
+
+ mapper = new ObjectMapper();
+
+ mapper
+ .configure(
+ DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+ Boolean.TRUE);
+ mapper.configure(
+ DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+ Boolean.TRUE);
+ mapper
+ .configure(
+ DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+ Boolean.TRUE);
+ mapper.configure(
+ DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+ Boolean.TRUE);
}
@@ -85,23 +108,17 @@ public class MoreoverResult implements I
return end;
}
- public String getJSONString() {
+ public void process() {
- if( this.jsonString != null ) {
- return jsonString;
- }
- else {
- try {
- this.resultObject = xmlMapper.readValue(xmlString, ObjectNode.class);
- this.jsonString = mapper.writeValueAsString(this.resultObject);
- this.articlesArray = (JsonNode)this.resultObject.get("articles");
- } catch (IOException e) {
- e.printStackTrace();
- }
+ try {
+ this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
+ this.articles = resultObject.getArticles();
+ this.articleArray = articles.getArticle();
+ } catch (IOException e) {
+ e.printStackTrace();
}
- for (JsonNode articleNode : ImmutableList.copyOf(articlesArray.elements())) {
- Article article = mapper.convertValue(articleNode, Article.class);
+ for (Article article : articleArray) {
BigInteger sequenceid = new BigInteger(article.getSequenceId());
list.add(new StreamsDatum(article, sequenceid));
logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
@@ -111,7 +128,7 @@ public class MoreoverResult implements I
}
}
- return jsonString;
+
}
public String getXmlString() {