You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/07/01 22:48:59 UTC
[3/4] incubator-streams git commit: improves fix for STREAMS-337
improves fix for STREAMS-337
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/28de9ba6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/28de9ba6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/28de9ba6
Branch: refs/heads/master
Commit: 28de9ba686052b7cacab813e3a799ebc8b455d61
Parents: c5ebb6b
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Jul 1 14:10:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Wed Jul 1 14:10:23 2015 -0500
----------------------------------------------------------------------
.../streams/hdfs/WebHdfsPersistReader.java | 87 ++++++++++++++++++++
1 file changed, 87 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/28de9ba6/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index cd13c60..2cddbc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -18,6 +18,7 @@
package org.apache.streams.hdfs;
+import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Queues;
@@ -28,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.core.*;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
@@ -39,7 +42,9 @@ import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
+import java.util.Map;
import java.util.Queue;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -72,6 +77,10 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
private Future<?> task;
+ public WebHdfsPersistReader() {
+ this(new ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+ }
+
public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
}
@@ -204,6 +213,84 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
return current;
}
+ public StreamsDatum processLine(String line) {
+
+ List<String> expectedFields = hdfsConfiguration.getFields();
+ String[] parsedFields = line.split(hdfsConfiguration.getFieldDelimiter());
+
+ if( parsedFields.length == 0)
+ return null;
+
+ String id = null;
+ DateTime ts = null;
+ Map<String, Object> metadata = null;
+ String json = null;
+
+ if( expectedFields.contains( HdfsConstants.DOC )
+ && parsedFields.length > expectedFields.indexOf(HdfsConstants.DOC)) {
+ json = parsedFields[expectedFields.indexOf(HdfsConstants.DOC)];
+ }
+
+ if( expectedFields.contains( HdfsConstants.ID )
+ && parsedFields.length > expectedFields.indexOf(HdfsConstants.ID)) {
+ id = parsedFields[expectedFields.indexOf(HdfsConstants.ID)];
+ }
+ if( expectedFields.contains( HdfsConstants.TS )
+ && parsedFields.length > expectedFields.indexOf(HdfsConstants.TS)) {
+ ts = parseTs(parsedFields[expectedFields.indexOf(HdfsConstants.TS)]);
+ }
+ if( expectedFields.contains( HdfsConstants.META )
+ && parsedFields.length > expectedFields.indexOf(HdfsConstants.META)) {
+ metadata = parseMap(parsedFields[expectedFields.indexOf(HdfsConstants.META)]);
+ }
+
+ StreamsDatum datum = new StreamsDatum(json);
+ datum.setId(id);
+ datum.setTimestamp(ts);
+ datum.setMetadata(metadata);
+
+ return datum;
+
+ }
+
+ public DateTime parseTs(String field) {
+
+ DateTime timestamp = null;
+ try {
+ long longts = Long.parseLong(field);
+ timestamp = new DateTime(longts);
+ } catch ( Exception e ) {}
+ try {
+ timestamp = mapper.readValue(field, DateTime.class);
+ } catch ( Exception e ) {}
+
+ return timestamp;
+ }
+
+ public Map<String, Object> parseMap(String field) {
+
+ Map<String, Object> metadata = null;
+
+ try {
+ JsonNode jsonNode = mapper.readValue(field, JsonNode.class);
+ metadata = mapper.convertValue(jsonNode, Map.class);
+ } catch (IOException e) {
+ LOGGER.warn("failed in parseMap: " + e.getMessage());
+ }
+ return metadata;
+ }
+
+ protected void write( StreamsDatum entry ) {
+ boolean success;
+ do {
+ synchronized( WebHdfsPersistReader.class ) {
+ success = persistQueue.offer(entry);
+ }
+ Thread.yield();
+ }
+ while( !success );
+ }
+
@Override
public StreamsResultSet readNew(BigInteger sequence) {
return null;