You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/03/17 20:52:05 UTC
git commit: found and fixed failure condition in moreoverclient made
HDFS id (column 1) the default ES id
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-26 d764d7c96 -> 34990168b
found and fixed failure condition in moreoverclient
made HDFS id (column 1) the default ES id
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/34990168
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/34990168
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/34990168
Branch: refs/heads/STREAMS-26
Commit: 34990168bf9d01540d9100162ef5d04a48d35a32
Parents: d764d7c
Author: sblackmon <sb...@w2odigital.com>
Authored: Mon Mar 17 14:51:40 2014 -0500
Committer: sblackmon <sb...@w2odigital.com>
Committed: Mon Mar 17 14:51:40 2014 -0500
----------------------------------------------------------------------
.../ElasticsearchPersistWriter.java | 6 ++--
.../streams/hdfs/WebHdfsPersistReader.java | 1 +
.../streams/hdfs/WebHdfsPersistReaderTask.java | 10 ++++--
.../streams/data/moreover/MoreoverClient.java | 10 +++++-
.../streams/data/moreover/MoreoverResult.java | 35 +++++++++++++++-----
.../org/apache/streams/core/StreamsDatum.java | 3 +-
6 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index c4fd9f0..595011b 100644
--- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -133,9 +133,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Flushab
add(config.getIndex(), config.getType(), id, json);
- } catch (JsonProcessingException e) {
- LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
-
+ } catch (Exception e) {
+ LOGGER.warn("{} {}", e.getMessage());
+ e.printStackTrace();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index cf4c146..659c517 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -2,6 +2,7 @@ package org.apache.streams.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index e8c1695..6cd1e79 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -1,13 +1,16 @@
package org.apache.streams.hdfs;
+import com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.StreamsDatum;
+import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.Calendar;
import java.util.Random;
public class WebHdfsPersistReaderTask implements Runnable {
@@ -34,9 +37,12 @@ public class WebHdfsPersistReaderTask implements Runnable {
do{
try {
line = bufferedReader.readLine();
- if( line != null ) {
+ if( !Strings.isNullOrEmpty(line) ) {
String[] fields = line.split(Character.toString(reader.DELIMITER));
- reader.persistQueue.offer(new StreamsDatum(fields[3]));
+ Calendar cal = Calendar.getInstance();
+ cal.setTimeInMillis(new Long(fields[2]));
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(cal.getTime()));
+ reader.persistQueue.offer(entry);
}
} catch (Exception e) {
LOGGER.warn("Failed processing " + line);
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
index a43c4d8..b5888c3 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverClient.java
@@ -40,9 +40,13 @@ public class MoreoverClient {
logger.debug("Making call to {}", urlString);
long start = System.nanoTime();
MoreoverResult result = new MoreoverResult(id, getArticles(new URL(urlString)), start, System.nanoTime());
- logger.debug("Maximum sequence from last call {}", result.getMaxSequencedId());
if(!result.getMaxSequencedId().equals(BigInteger.ZERO))
+ {
this.lastSequenceId = result.getMaxSequencedId();
+ logger.debug("Maximum sequence from last call {}", this.lastSequenceId);
+ }
+ else
+ logger.debug("No maximum sequence returned in last call {}", this.lastSequenceId);
return result;
}
@@ -79,6 +83,10 @@ public class MoreoverClient {
IOUtils.copy(new InputStreamReader(cn.getInputStream(), Charset.forName("UTF-8")), writer);
writer.flush();
pullTime = new Date().getTime();
+
+ // added after seeing java.net.SocketException: Too many open files
+ cn.disconnect();
+
return writer.toString();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
index 50e3f66..0ef49c5 100644
--- a/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
+++ b/streams-contrib/streams-provider-moreover/src/main/java/org/apache/streams/data/moreover/MoreoverResult.java
@@ -3,6 +3,7 @@ package org.apache.streams.data.moreover;
import com.fasterxml.aalto.stax.InputFactoryImpl;
import com.fasterxml.aalto.stax.OutputFactoryImpl;
import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -112,18 +113,34 @@ public class MoreoverResult implements Iterable<StreamsDatum> {
try {
this.resultObject = xmlMapper.readValue(xmlString, ArticlesResponse.class);
- this.articles = resultObject.getArticles();
- this.articleArray = articles.getArticle();
- } catch (IOException e) {
+ } catch (JsonMappingException e) {
+ // theory is this may not be fatal
+ this.resultObject = (ArticlesResponse) e.getPath().get(0).getFrom();
+ } catch (Exception e) {
e.printStackTrace();
+ logger.warn("Unable to process document:");
+ logger.warn(xmlString);
+ }
+
+ if( this.resultObject.getStatus().equals("FAILURE"))
+ {
+ logger.warn(this.resultObject.getStatus());
+ logger.warn(this.resultObject.getMessageCode());
+ logger.warn(this.resultObject.getUserMessage());
+ logger.warn(this.resultObject.getDeveloperMessage());
}
+ else
+ {
+ this.articles = resultObject.getArticles();
+ this.articleArray = articles.getArticle();
- for (Article article : articleArray) {
- BigInteger sequenceid = new BigInteger(article.getSequenceId());
- list.add(new StreamsDatum(article, sequenceid));
- logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
- if (sequenceid.compareTo(this.maxSequencedId) > 0) {
- this.maxSequencedId = sequenceid;
+ for (Article article : articleArray) {
+ BigInteger sequenceid = new BigInteger(article.getSequenceId());
+ list.add(new StreamsDatum(article, sequenceid));
+ logger.trace("Prior max sequence Id {} current candidate {}", this.maxSequencedId, sequenceid);
+ if (sequenceid.compareTo(this.maxSequencedId) > 0) {
+ this.maxSequencedId = sequenceid;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/34990168/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
----------------------------------------------------------------------
diff --git a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
index 7e7e553..78623b0 100644
--- a/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
+++ b/streams-core/src/main/java/org/apache/streams/core/StreamsDatum.java
@@ -136,6 +136,7 @@ public class StreamsDatum implements Serializable {
@Override
public String toString() {
- return "Document="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
+ return this.id+"\tDocument="+this.document+"\ttimestamp="+this.timestamp+"\tsequence="+this.sequenceid;
}
+
}