You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/17 20:52:05 UTC

git commit: found and fixed failure condition in moreoverclient made HDFS id (column 1) the default ES id

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-26 d764d7c96 -> 34990168b


found and fixed failure condition in moreoverclient
made HDFS id (column 1) the default ES id


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34990168
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34990168
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34990168

Branch: refs/heads/STREAMS-26
Commit: 34990168bf9d01540d9100162ef5d04a48d35a32
Parents: d764d7c
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Mar 17 14:51:40 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Mar 17 14:51:40 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             |  6 ++--
 .../streams/hdfs/WebHdfsPersistReader.java      |  1 +
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 10 ++++--
 .../streams/data/moreover/MoreoverClient.java   | 10 +++++-
 .../streams/data/moreover/MoreoverResult.java   | 35 +++++++++++++++-----
 .../org/apache/streams/core/StreamsDatum.java   |  3 +-
 6 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c4fd9f0..595011b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -133,9 +133,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
             add(config.getIndex(), config.getType(), id, json);
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
-
+        } catch (Exception e) {
+            LOGGER.warn("{} {}", e.getMessage());
+            e.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index cf4c146..659c517 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -2,6 +2,7 @@ package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index e8c1695..6cd1e79 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -1,13 +1,16 @@
 package org.apache.streams.hdfs;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Calendar;
 import java.util.Random;
 
 public class WebHdfsPersistReaderTask implements Runnable {
@@ -34,9 +37,12 @@ public class WebHdfsPersistReaderTask implements Runnable {
                     do{
                         try {
                             line = bufferedReader.readLine();
-                            if( line != null ) {
+                            if( !Strings.isNullOrEmpty(line) ) {
                                 String[] fields = line.split(Character.toString(reader.DELIMITER));
-                                reader.persistQueue.offer(new StreamsDatum(fields[3]));
+                                Calendar cal = Calendar.getInstance();
+                                cal.setTimeInMillis(new Long(fields[2]));
+                                StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime()));
+                                reader.persistQueue.offer(entry);
                             }
                         } catch (Exception e) {
                             LOGGER.warn("Failed processing " + line);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index a43c4d8..b5888c3 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -40,9 +40,13 @@ public class MoreoverClient {
         logger.debug("Making call to {}", urlString);
         long start = System.nanoTime();
         MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
-        logger.debug("Maximum sequence from last call {}", result.getMaxSequencedId());
         if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
+        {
             this.lastSequenceId = result.getMaxSequencedId();
+            logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
+        }
+        else
+            logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
         return result;
     }
 
@@ -79,6 +83,10 @@ public class MoreoverClient {
         IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
         writer.flush();
         pullTime = new Date().getTime();
+
+        // added after seeing java.net.SocketException: Too many open files
+        cn.disconnect();
+
         return writer.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
index 50e3f66..0ef49c5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
@@ -3,6 +3,7 @@ package org.apache.streams.data.moreover;
 import com.fasterxml.aalto.stax.InputFactoryImpl;
 import com.fasterxml.aalto.stax.OutputFactoryImpl;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -112,18 +113,34 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
 
         try {
             this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
-            this.articles = resultObject.getArticles();
-            this.articleArray = articles.getArticle();
-        } catch (IOException e) {
+        } catch (JsonMappingException e) {
+            // theory is this may not be fatal
+            this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
+        } catch (Exception e) {
             e.printStackTrace();
+            logger.warn("Unable to process document:");
+            logger.warn(xmlString);
+        }
+
+        if( this.resultObject.getStatus().equals("FAILURE"))
+        {
+            logger.warn(this.resultObject.getStatus());
+            logger.warn(this.resultObject.getMessageCode());
+            logger.warn(this.resultObject.getUserMessage());
+            logger.warn(this.resultObject.getDeveloperMessage());
         }
+        else
+        {
+            this.articles = resultObject.getArticles();
+            this.articleArray = articles.getArticle();
 
-        for (Article article : articleArray) {
-            BigInteger sequenceid = new BigInteger(article.getSequenceId());
-            list.add(new StreamsDatum(article, sequenceid));
-            logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
-            if (sequenceid.compareTo(this.maxSequencedId) > 0) {
-                this.maxSequencedId = sequenceid;
+            for (Article article : articleArray) {
+                BigInteger sequenceid = new BigInteger(article.getSequenceId());
+                list.add(new StreamsDatum(article, sequenceid));
+                logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
+                if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+                    this.maxSequencedId = sequenceid;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 7e7e553..78623b0 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -136,6 +136,7 @@ public class StreamsDatum implements Serializable {
 
     @Override
     public String toString() {
-        return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
+        return this.id+"\tDocument="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
     }
+
 }