You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/07/01 22:48:57 UTC

[1/4] incubator-streams git commit: resolves STREAMS-337 #337

Repository: incubator-streams
Updated Branches:
  refs/heads/master d982a877b -> 200e761e4


resolves STREAMS-337 #337


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/abcd9737
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/abcd9737
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/abcd9737

Branch: refs/heads/master
Commit: abcd97374342b8c7bfc111ce4b293f8d1b2f2e41
Parents: 51b9622
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Tue Jun 9 14:54:58 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Tue Jun 9 14:54:58 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java   | 2 +-
 .../jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json    | 4 ++++
 2 files changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/abcd9737/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index 4d1c43d..a75e680 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -55,7 +55,7 @@ public class WebHdfsPersistReaderTask implements Runnable {
             if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
                 LOGGER.info("Started Processing " + fileStatus.getPath().getName());
                 try {
-                    bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath())));
+                    bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath()), reader.hdfsConfiguration.getEncoding()));
                 } catch (Exception e) {
                     e.printStackTrace();
                     LOGGER.error(e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/abcd9737/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index e6e1e4c..61245c4 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -53,6 +53,10 @@
         "line_delimiter": {
           "type": "string",
           "default": "\n"
+        },
+        "encoding": {
+          "type": "string",
+          "default": "UTF-8"
         }
     }
 }
\ No newline at end of file


[2/4] incubator-streams git commit: improves fix for STREAMS-337

Posted by sb...@apache.org.
improves fix for STREAMS-337


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c5ebb6b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c5ebb6b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c5ebb6b5

Branch: refs/heads/master
Commit: c5ebb6b5d5f89b60fa8c291f179375f76323b17d
Parents: abcd973
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Jun 12 11:13:53 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Jun 12 12:16:26 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReader.java     | 6 ++++--
 .../java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c5ebb6b5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 08f78cf..cd13c60 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -158,7 +158,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
                 LOGGER.error("Neither file nor directory, wtf");
             }
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("IOException", e);
         }
         persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
         //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
@@ -216,7 +216,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
 
     @Override
     public boolean isRunning() {
-        return !task.isDone() && !task.isCancelled();
+        if( task != null)
+            return !task.isDone() && !task.isCancelled();
+        else return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c5ebb6b5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index a75e680..65bacda 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -53,12 +53,11 @@ public class WebHdfsPersistReaderTask implements Runnable {
             BufferedReader bufferedReader;
             LOGGER.info("Found " + fileStatus.getPath().getName());
             if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
-                LOGGER.info("Started Processing " + fileStatus.getPath().getName());
+                LOGGER.info("Started Processing " + fileStatus.getPath().getName() + " expecting " + reader.hdfsConfiguration.getEncoding());
                 try {
                     bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath()), reader.hdfsConfiguration.getEncoding()));
                 } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.error(e.getMessage());
+                    LOGGER.error("Exception Opening " + fileStatus.getPath(), e.getMessage());
                     return;
                 }
 


[4/4] incubator-streams git commit: Merge branch 'STREAMS-337'

Posted by sb...@apache.org.
Merge branch 'STREAMS-337'

* STREAMS-337:
  improves fix for STREAMS-337
  improves fix for STREAMS-337
  resolves STREAMS-337 #337

Conflicts:
	streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
	streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/200e761e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/200e761e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/200e761e

Branch: refs/heads/master
Commit: 200e761e4e2c01a29dcaede842d2648263476e02
Parents: d982a87 28de9ba
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Jul 1 15:48:35 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Wed Jul 1 15:48:35 2015 -0500

----------------------------------------------------------------------
 .../org/apache/streams/hdfs/WebHdfsPersistReader.java    | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[3/4] incubator-streams git commit: improves fix for STREAMS-337

Posted by sb...@apache.org.
improves fix for STREAMS-337


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/28de9ba6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/28de9ba6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/28de9ba6

Branch: refs/heads/master
Commit: 28de9ba686052b7cacab813e3a799ebc8b455d61
Parents: c5ebb6b
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Wed Jul 1 14:10:23 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Wed Jul 1 14:10:23 2015 -0500

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistReader.java      | 87 ++++++++++++++++++++
 1 file changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/28de9ba6/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index cd13c60..2cddbc3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.streams.hdfs;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
@@ -28,6 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.*;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.joda.time.DateTime;
@@ -39,7 +42,9 @@ import java.math.BigInteger;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Map;
 import java.util.Queue;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -72,6 +77,10 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
     protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
     private Future<?> task;
 
+    public WebHdfsPersistReader() {
+        this(new ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+    }
+
     public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
         this.hdfsConfiguration = hdfsConfiguration;
     }
@@ -204,6 +213,84 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
         return current;
     }
 
+    public StreamsDatum processLine(String line) {
+
+        List<String> expectedFields = hdfsConfiguration.getFields();
+        String[] parsedFields = line.split(hdfsConfiguration.getFieldDelimiter());
+
+        if( parsedFields.length == 0)
+            return null;
+
+        String id = null;
+        DateTime ts = null;
+        Map<String, Object> metadata = null;
+        String json = null;
+
+        if( expectedFields.contains( HdfsConstants.DOC )
+                && parsedFields.length > expectedFields.indexOf(HdfsConstants.DOC)) {
+            json = parsedFields[expectedFields.indexOf(HdfsConstants.DOC)];
+        }
+
+        if( expectedFields.contains( HdfsConstants.ID )
+                && parsedFields.length > expectedFields.indexOf(HdfsConstants.ID)) {
+            id = parsedFields[expectedFields.indexOf(HdfsConstants.ID)];
+        }
+        if( expectedFields.contains( HdfsConstants.TS )
+                && parsedFields.length > expectedFields.indexOf(HdfsConstants.TS)) {
+            ts = parseTs(parsedFields[expectedFields.indexOf(HdfsConstants.TS)]);
+        }
+        if( expectedFields.contains( HdfsConstants.META )
+                && parsedFields.length > expectedFields.indexOf(HdfsConstants.META)) {
+            metadata = parseMap(parsedFields[expectedFields.indexOf(HdfsConstants.META)]);
+        }
+
+        StreamsDatum datum = new StreamsDatum(json);
+        datum.setId(id);
+        datum.setTimestamp(ts);
+        datum.setMetadata(metadata);
+
+        return datum;
+
+    }
+
+    public DateTime parseTs(String field) {
+
+        DateTime timestamp = null;
+        try {
+            long longts = Long.parseLong(field);
+            timestamp = new DateTime(longts);
+        } catch ( Exception e ) {}
+        try {
+            timestamp = mapper.readValue(field, DateTime.class);
+        } catch ( Exception e ) {}
+
+        return timestamp;
+    }
+
+    public Map<String, Object> parseMap(String field) {
+
+        Map<String, Object> metadata = null;
+
+        try {
+            JsonNode jsonNode = mapper.readValue(field, JsonNode.class);
+            metadata = mapper.convertValue(jsonNode, Map.class);
+        } catch (IOException e) {
+            LOGGER.warn("failed in parseMap: " + e.getMessage());
+        }
+        return metadata;
+    }
+
+    protected void write( StreamsDatum entry ) {
+        boolean success;
+        do {
+            synchronized( WebHdfsPersistReader.class ) {
+                success = persistQueue.offer(entry);
+            }
+            Thread.yield();
+        }
+        while( !success );
+    }
+
     @Override
     public StreamsResultSet readNew(BigInteger sequence) {
         return null;