You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/02/28 08:23:24 UTC

svn commit: r1572843 - in /incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover: MoreoverClient.java MoreoverProvider.java MoreoverProviderTask.java MoreoverResult.java

Author: sblackmon
Date: Fri Feb 28 07:23:23 2014
New Revision: 1572843

URL: http://svn.apache.org/r1572843
Log:
Made some tweaks and now provider is running single-threaded

Modified:
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
    incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java Fri Feb 28 07:23:23 2014
@@ -46,7 +46,7 @@ public class MoreoverClient {
     }
 
     public MoreoverResult getNextBatch() throws IOException{
-        //logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
+        logger.debug("Getting next results for {} {} {}", this.id, this.apiKey, this.lastSequenceId);
         return getArticlesAfter(this.lastSequenceId.toString(), 500);
     }
 

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProvider.java Fri Feb 28 07:23:23 2014
@@ -1,6 +1,8 @@
 package org.apache.streams.data.moreover;
 
-import com.google.common.collect.Lists;
+import com.google.common.base.Predicates;
+import com.google.common.collect.*;
+import net.jcip.annotations.Immutable;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
 import org.apache.streams.core.StreamsResultSet;
@@ -11,10 +13,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
-import java.util.Arrays;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
+import java.util.*;
 import java.util.concurrent.*;
 
 public class MoreoverProvider implements StreamsProvider {
@@ -50,16 +49,21 @@ public class MoreoverProvider implements
     }
 
     @Override
-    public StreamsResultSet readCurrent() {
+    public synchronized StreamsResultSet readCurrent() {
         LOGGER.debug("readCurrent");
 
         LOGGER.info("Providing {} docs", providerQueue.size());
 
-        StreamsResultSet result =  new StreamsResultSet(providerQueue);
+        Collection<StreamsDatum> currentIterator = Lists.newArrayList();
+        Iterators.addAll(currentIterator, providerQueue.iterator());
+
+        StreamsResultSet current = new StreamsResultSet(Queues.newConcurrentLinkedQueue(currentIterator));
 
         LOGGER.info("Exiting");
 
-        return result;
+        providerQueue.clear();
+
+        return current;
     }
 
     @Override

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverProviderTask.java Fri Feb 28 07:23:23 2014
@@ -37,15 +37,18 @@ public class MoreoverProviderTask implem
 
     @Override
     public void run() {
-        try {
-            ensureTime(moClient);
-            MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
-            started = true;
-            for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
-                results.offer(entry);
-            logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
-        } catch (Exception e) {
-            logger.error("Exception while polling moreover", e);
+        while(true) {
+            try {
+                ensureTime(moClient);
+                MoreoverResult result = started ? moClient.getNextBatch() : moClient.getArticlesAfter(lastSequence, 500);
+                started = true;
+                result.process();
+                for(StreamsDatum entry : ImmutableSet.copyOf(result.iterator()))
+                    results.offer(entry);
+                logger.info("ApiKey={}\tlastSequenceid={}", this.apiKey, result.getMaxSequencedId());
+            } catch (Exception e) {
+                logger.error("Exception while polling moreover", e);
+            }
         }
     }
 

Modified: incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
URL: http://svn.apache.org/viewvc/incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java?rev=1572843&r1=1572842&r2=1572843&view=diff
==============================================================================
--- incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java (original)
+++ incubator/streams/branches/STREAMS-26/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java Fri Feb 28 07:23:23 2014
@@ -12,6 +12,7 @@ import com.fasterxml.jackson.dataformat.
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.moreover.api.Article;
+import com.moreover.api.ArticlesResponse;
 import org.apache.streams.core.StreamsDatum;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,21 +26,23 @@ import java.util.List;
 
 public class MoreoverResult implements Iterable<StreamsDatum> {
 
-    private static final Logger logger = LoggerFactory.getLogger(MoreoverClient.class);
+    private static final Logger logger = LoggerFactory.getLogger(MoreoverResult.class);
 
     private ObjectMapper mapper;
     private XmlMapper xmlMapper;
 
     private String xmlString;
     private String jsonString;
-    private ObjectNode resultObject;
-    private JsonNode articlesArray;
+    private ArticlesResponse resultObject;
+    private ArticlesResponse.Articles articles;
+    private List<Article> articleArray;
     private long start;
     private long end;
     private String clientId;
     private BigInteger maxSequencedId = BigInteger.ZERO;
 
-    private List<StreamsDatum> list = Lists.newArrayList();
+    protected ArticlesResponse response;
+    protected List<StreamsDatum> list = Lists.newArrayList();
 
     protected MoreoverResult(String clientId, String xmlString, long start, long end) {
         this.xmlString = xmlString;
@@ -70,6 +73,26 @@ public class MoreoverResult implements I
         xmlMapper.configure(
                 DeserializationFeature.READ_ENUMS_USING_TO_STRING,
                 Boolean.TRUE);
+        xmlMapper.configure(
+                DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+                Boolean.FALSE);
+
+        mapper = new ObjectMapper();
+
+        mapper
+                .configure(
+                        DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY,
+                        Boolean.TRUE);
+        mapper.configure(
+                DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT,
+                Boolean.TRUE);
+        mapper
+                .configure(
+                        DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY,
+                        Boolean.TRUE);
+        mapper.configure(
+                DeserializationFeature.READ_ENUMS_USING_TO_STRING,
+                Boolean.TRUE);
 
     }
 
@@ -85,23 +108,17 @@ public class MoreoverResult implements I
         return end;
     }
 
-    public String getJSONString() {
+    public void process() {
 
-        if( this.jsonString != null ) {
-            return jsonString;
-        }
-        else {
-            try {
-                this.resultObject = xmlMapper.readValue(xmlString, ObjectNode.class);
-                this.jsonString = mapper.writeValueAsString(this.resultObject);
-                this.articlesArray = (JsonNode)this.resultObject.get("articles");
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
+        try {
+            this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
+            this.articles = resultObject.getArticles();
+            this.articleArray = articles.getArticle();
+        } catch (IOException e) {
+            e.printStackTrace();
         }
 
-        for (JsonNode articleNode : ImmutableList.copyOf(articlesArray.elements())) {
-            Article article = mapper.convertValue(articleNode, Article.class);
+        for (Article article : articleArray) {
             BigInteger sequenceid = new BigInteger(article.getSequenceId());
             list.add(new StreamsDatum(article, sequenceid));
             logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
@@ -111,7 +128,7 @@ public class MoreoverResult implements I
             }
 
         }
-        return jsonString;
+
     }
 
     public String getXmlString() {