You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2015/07/01 22:48:58 UTC

[2/4] incubator-streams git commit: improves fix for STREAMS-337

improves fix for STREAMS-337


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/c5ebb6b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/c5ebb6b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/c5ebb6b5

Branch: refs/heads/master
Commit: c5ebb6b5d5f89b60fa8c291f179375f76323b17d
Parents: abcd973
Author: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Authored: Fri Jun 12 11:13:53 2015 -0500
Committer: Steve Blackmon (@steveblackmon) <sb...@apache.org>
Committed: Fri Jun 12 12:16:26 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/streams/hdfs/WebHdfsPersistReader.java     | 6 ++++--
 .../java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java | 5 ++---
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c5ebb6b5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 08f78cf..cd13c60 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -158,7 +158,7 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
                 LOGGER.error("Neither file nor directory, wtf");
             }
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("IOException", e);
         }
         persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000));
         //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
@@ -216,7 +216,9 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
 
     @Override
     public boolean isRunning() {
-        return !task.isDone() && !task.isCancelled();
+        if( task != null)
+            return !task.isDone() && !task.isCancelled();
+        else return true;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/c5ebb6b5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index a75e680..65bacda 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -53,12 +53,11 @@ public class WebHdfsPersistReaderTask implements Runnable {
             BufferedReader bufferedReader;
             LOGGER.info("Found " + fileStatus.getPath().getName());
             if( fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
-                LOGGER.info("Started Processing " + fileStatus.getPath().getName());
+                LOGGER.info("Started Processing " + fileStatus.getPath().getName() + " expecting " + reader.hdfsConfiguration.getEncoding());
                 try {
                     bufferedReader = new BufferedReader(new InputStreamReader(reader.client.open(fileStatus.getPath()), reader.hdfsConfiguration.getEncoding()));
                 } catch (Exception e) {
-                    e.printStackTrace();
-                    LOGGER.error(e.getMessage());
+                    LOGGER.error("Exception Opening " + fileStatus.getPath(), e.getMessage());
                     return;
                 }