You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/22 00:20:32 UTC

[54/71] [abbrv] git commit: found and fixed failure condition in moreoverclient made HDFS id (column 1) the default ES id

found and fixed failure condition in moreoverclient
made HDFS id (column 1) the default ES id


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34990168
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34990168
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34990168

Branch: refs/heads/master
Commit: 34990168bf9d01540d9100162ef5d04a48d35a32
Parents: d764d7c
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Mar 17 14:51:40 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Mar 17 14:51:40 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchPersistWriter.java             |  6 ++--
 .../streams/hdfs/WebHdfsPersistReader.java      |  1 +
 .../streams/hdfs/WebHdfsPersistReaderTask.java  | 10 ++++--
 .../streams/data/moreover/MoreoverClient.java   | 10 +++++-
 .../streams/data/moreover/MoreoverResult.java   | 35 +++++++++++++++-----
 .../org/apache/streams/core/StreamsDatum.java   |  3 +-
 6 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c4fd9f0..595011b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -133,9 +133,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
 
             add(config.getIndex(), config.getType(), id, json);
 
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
-
+        } catch (Exception e) {
+            LOGGER.warn("{} {}", e.getMessage());
+            e.printStackTrace();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index cf4c146..659c517 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -2,6 +2,7 @@ package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Queues;

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index e8c1695..6cd1e79 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -1,13 +1,16 @@
 package org.apache.streams.hdfs;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.Calendar;
 import java.util.Random;
 
 public class WebHdfsPersistReaderTask implements Runnable {
@@ -34,9 +37,12 @@ public class WebHdfsPersistReaderTask implements Runnable {
                     do{
                         try {
                             line = bufferedReader.readLine();
-                            if( line != null ) {
+                            if( !Strings.isNullOrEmpty(line) ) {
                                 String[] fields = line.split(Character.toString(reader.DELIMITER));
-                                reader.persistQueue.offer(new StreamsDatum(fields[3]));
+                                Calendar cal = Calendar.getInstance();
+                                cal.setTimeInMillis(new Long(fields[2]));
+                                StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime()));
+                                reader.persistQueue.offer(entry);
                             }
                         } catch (Exception e) {
                             LOGGER.warn("Failed processing " + line);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index a43c4d8..b5888c3 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -40,9 +40,13 @@ public class MoreoverClient {
         logger.debug("Making call to {}", urlString);
         long start = System.nanoTime();
         MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
-        logger.debug("Maximum sequence from last call {}", result.getMaxSequencedId());
         if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
+        {
             this.lastSequenceId = result.getMaxSequencedId();
+            logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
+        }
+        else
+            logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
         return result;
     }
 
@@ -79,6 +83,10 @@ public class MoreoverClient {
         IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
         writer.flush();
         pullTime = new Date().getTime();
+
+        // added after seeing java.net.SocketException: Too many open files
+        cn.disconnect();
+
         return writer.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
index 50e3f66..0ef49c5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
@@ -3,6 +3,7 @@ package org.apache.streams.data.moreover;
 import com.fasterxml.aalto.stax.InputFactoryImpl;
 import com.fasterxml.aalto.stax.OutputFactoryImpl;
 import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -112,18 +113,34 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
 
         try {
             this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
-            this.articles = resultObject.getArticles();
-            this.articleArray = articles.getArticle();
-        } catch (IOException e) {
+        } catch (JsonMappingException e) {
+            // theory is this may not be fatal
+            this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
+        } catch (Exception e) {
             e.printStackTrace();
+            logger.warn("Unable to process document:");
+            logger.warn(xmlString);
+        }
+
+        if( this.resultObject.getStatus().equals("FAILURE"))
+        {
+            logger.warn(this.resultObject.getStatus());
+            logger.warn(this.resultObject.getMessageCode());
+            logger.warn(this.resultObject.getUserMessage());
+            logger.warn(this.resultObject.getDeveloperMessage());
         }
+        else
+        {
+            this.articles = resultObject.getArticles();
+            this.articleArray = articles.getArticle();
 
-        for (Article article : articleArray) {
-            BigInteger sequenceid = new BigInteger(article.getSequenceId());
-            list.add(new StreamsDatum(article, sequenceid));
-            logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
-            if (sequenceid.compareTo(this.maxSequencedId) > 0) {
-                this.maxSequencedId = sequenceid;
+            for (Article article : articleArray) {
+                BigInteger sequenceid = new BigInteger(article.getSequenceId());
+                list.add(new StreamsDatum(article, sequenceid));
+                logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
+                if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+                    this.maxSequencedId = sequenceid;
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 7e7e553..78623b0 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -136,6 +136,7 @@ public class StreamsDatum implements Serializable {
 
     @Override
     public String toString() {
-        return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
+        return this.id+"\tDocument="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
     }
+
 }