You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/30 05:05:24 UTC

[1/4] git commit: Updated formatting

Repository: incubator-streams
Updated Branches:
  refs/heads/STREAMS-104 [created] d4d95ccf0


Updated formatting


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e67d9adb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e67d9adb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e67d9adb

Branch: refs/heads/STREAMS-104
Commit: e67d9adb050bd7fe1c28433c856797aa814ba5f2
Parents: f4749b9
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:01:22 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:01:22 2014 -0400

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistWriter.java      | 78 +++++++++-----------
 1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e67d9adb/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 12f9a5e..4c083fc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -25,14 +25,13 @@ import java.util.Date;
 import java.util.List;
 import java.util.Queue;
 
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable
-{
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
     public final static String STREAMS_ID = "WebHdfsPersistWriter";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
 
     private final static char DELIMITER = '\t';
-    private final static int  DEFAULT_LINES_PER_FILE = 50000;
+    private final static int DEFAULT_LINES_PER_FILE = 50000;
 
     private FileSystem client;
     private Path path;
@@ -43,10 +42,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     private int fileLineCounter = 0;
     private OutputStreamWriter currentWriter = null;
 
-    private static final int  BYTES_IN_MB = 1024*1024;
-    private static final int  BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
-    private volatile int  totalByteCount = 0;
-    private volatile int  byteCount = 0;
+    private static final int BYTES_IN_MB = 1024 * 1024;
+    private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
+    private volatile int totalByteCount = 0;
+    private volatile int byteCount = 0;
 
     public boolean terminate = false;
 
@@ -60,21 +59,23 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         this.hdfsConfiguration = hdfsConfiguration;
     }
 
-    public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
-    public boolean isConnected() 		                { return (client != null); }
+    public URI getURI() throws URISyntaxException {
+        return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+    }
+
+    public boolean isConnected() {
+        return (client != null);
+    }
 
-    public final synchronized FileSystem getFileSystem()
-    {
+    public final synchronized FileSystem getFileSystem() {
         // Check to see if we are connected.
-        if(!isConnected())
+        if (!isConnected())
             connectToWebHDFS();
         return this.client;
     }
 
-    private synchronized void connectToWebHDFS()
-    {
-        try
-        {
+    private synchronized void connectToWebHDFS() {
+        try {
             LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
             UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
             ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
@@ -108,19 +109,16 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                     return null;
                 }
             });
-        }
-        catch (Exception e)
-        {
+        } catch (Exception e) {
             LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
             e.printStackTrace();
         }
     }
-    
+
     @Override
     public void write(StreamsDatum streamsDatum) {
 
-        synchronized (this)
-        {
+        synchronized (this) {
             // Check to see if we need to reset the file that we are currently working with
             if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
                 try {
@@ -141,7 +139,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
             totalByteCount += bytesInLine;
             byteCount += bytesInLine;
 
-            if(byteCount > BYTES_BEFORE_FLUSH)
+            if (byteCount > BYTES_BEFORE_FLUSH)
                 try {
                     flush();
                 } catch (IOException e) {
@@ -152,24 +150,20 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         }
     }
 
-    public void flush() throws IOException
-    {
-        if(this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH)
-        {
+    public void flush() throws IOException {
+        if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
             this.currentWriter.flush();
             byteCount = 0;
         }
     }
 
-    private synchronized void resetFile() throws Exception
-    {
+    private synchronized void resetFile() throws Exception {
         // this will keep it thread safe, so we don't create too many files
-        if(this.fileLineCounter == 0 && this.currentWriter != null)
+        if (this.fileLineCounter == 0 && this.currentWriter != null)
             return;
 
         // if there is a current writer, we must close it first.
-        if (this.currentWriter != null)
-        {
+        if (this.currentWriter != null) {
             flush();
             close();
         }
@@ -179,10 +173,9 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         // Create the path for where the file is going to live.
         Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
 
-        try
-        {
+        try {
             // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
-            if(client.exists(filePath))
+            if (client.exists(filePath))
                 throw new RuntimeException("Unable to create file: " + filePath);
 
             this.currentWriter = new OutputStreamWriter(client.create(filePath));
@@ -191,19 +184,15 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
             writtenFiles.add(filePath);
 
             LOGGER.info("File Created: {}", filePath);
-        }
-        catch (Exception e)
-        {
+        } catch (Exception e) {
             LOGGER.error("COULD NOT CreateFile: {}", filePath);
             LOGGER.error(e.getMessage());
             throw e;
         }
     }
 
-    public synchronized void close() throws IOException
-    {
-        if(this.currentWriter != null)
-        {
+    public synchronized void close() throws IOException {
+        if (this.currentWriter != null) {
             this.currentWriter.flush();
             this.currentWriter.close();
             this.currentWriter = null;
@@ -211,8 +200,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         }
     }
 
-    private String convertResultToString(StreamsDatum entry)
-    {
+    private String convertResultToString(StreamsDatum entry) {
         String metadata = null;
         try {
             metadata = mapper.writeValueAsString(entry.getMetadata());
@@ -227,7 +215,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
             e.printStackTrace();
         }
 
-        if(Strings.isNullOrEmpty(documentJson))
+        if (Strings.isNullOrEmpty(documentJson))
             return null;
         else
             return new StringBuilder()


[2/4] git commit: Replaced e.printStackTrace() with log statements

Posted by mf...@apache.org.
Replaced e.printStackTrace() with log statements


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cb0c8797
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cb0c8797
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cb0c8797

Branch: refs/heads/STREAMS-104
Commit: cb0c879779ebf6140b81780ca9ef298298e9144e
Parents: e67d9ad
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:09:04 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:09:04 2014 -0400

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistWriter.java       | 19 ++++++++++---------
 1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb0c8797/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 4c083fc..8fae4a6 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -110,8 +110,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 }
             });
         } catch (Exception e) {
-            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
-            e.printStackTrace();
+            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again",e);
         }
     }
 
@@ -124,14 +123,16 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 try {
                     resetFile();
                 } catch (Exception e) {
-                    e.printStackTrace();
+                    LOGGER.error("Unable to reset file. Shutting down and propagating error", e);
+                    this.cleanUp();
+                    throw new RuntimeException(e);
                 }
 
             String line = convertResultToString(streamsDatum);
             try {
                 this.currentWriter.write(line);
             } catch (IOException e) {
-                e.printStackTrace();
+                LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", e);
             }
             int bytesInLine = line.getBytes().length;
 
@@ -143,7 +144,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 try {
                     flush();
                 } catch (IOException e) {
-                    e.printStackTrace();
+                    LOGGER.error("Error flushing to HDFS", e);
                 }
 
             this.fileLineCounter++;
@@ -205,14 +206,14 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         try {
             metadata = mapper.writeValueAsString(entry.getMetadata());
         } catch (JsonProcessingException e) {
-            e.printStackTrace();
+            LOGGER.warn("Error converting metadata to a string", e);
         }
 
         String documentJson = null;
         try {
             documentJson = mapper.writeValueAsString(entry.getDocument());
         } catch (JsonProcessingException e) {
-            e.printStackTrace();
+            LOGGER.warn("Error converting document to string", e);
         }
 
         if (Strings.isNullOrEmpty(documentJson))
@@ -241,12 +242,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         try {
             flush();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error flushing on cleanup", e);
         }
         try {
             close();
         } catch (IOException e) {
-            e.printStackTrace();
+            LOGGER.error("Error closing on cleanup", e);
         }
     }
 


[4/4] git commit: adjusted error message

Posted by mf...@apache.org.
adjusted error message


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4d95ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4d95ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4d95ccf

Branch: refs/heads/STREAMS-104
Commit: d4d95ccf0b68f010ecfa4d5c978317239a1a1408
Parents: 5c15bd8
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 23:05:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 23:05:00 2014 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4d95ccf/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index d0164f5..162461b 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -157,7 +157,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 try {
                     this.currentWriter.write(line);
                 } catch (IOException ex) {
-                    LOGGER.error("Failed to write to HDFS after reconnecting client", ex);
+                    LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", ex);
                     throw new RuntimeException(e);
                 }
             }


[3/4] git commit: Added error handler with retry

Posted by mf...@apache.org.
Added error handler with retry


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5c15bd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5c15bd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5c15bd8d

Branch: refs/heads/STREAMS-104
Commit: 5c15bd8d738844ef0df30ebe89f075ddd9fb38e1
Parents: cb0c879
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:40:47 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:40:47 2014 -0400

----------------------------------------------------------------------
 .../streams/hdfs/WebHdfsPersistWriter.java      | 63 ++++++++++++--------
 1 file changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5c15bd8d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 8fae4a6..d0164f5 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -110,7 +110,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 }
             });
         } catch (Exception e) {
-            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again",e);
+            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e);
+            throw new RuntimeException(e);
         }
     }
 
@@ -120,20 +121,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         synchronized (this) {
             // Check to see if we need to reset the file that we are currently working with
             if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
-                try {
-                    resetFile();
-                } catch (Exception e) {
-                    LOGGER.error("Unable to reset file. Shutting down and propagating error", e);
-                    this.cleanUp();
-                    throw new RuntimeException(e);
-                }
+                resetFile();
 
             String line = convertResultToString(streamsDatum);
-            try {
-                this.currentWriter.write(line);
-            } catch (IOException e) {
-                LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", e);
-            }
+            writeInternal(line);
             int bytesInLine = line.getBytes().length;
 
             totalRecordsWritten++;
@@ -144,13 +135,36 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
                 try {
                     flush();
                 } catch (IOException e) {
-                    LOGGER.error("Error flushing to HDFS", e);
+                    LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution.  WARNING: There could be data loss.", e);
                 }
 
             this.fileLineCounter++;
         }
     }
 
+    private void writeInternal(String line) {
+        try {
+            this.currentWriter.write(line);
+        } catch (IOException e) {
+            LOGGER.warn("Error writing to HDFS.  Attempting to try a new file", e);
+            try{
+                resetFile();
+                this.currentWriter.write(line);
+            } catch (IOException io) {
+                LOGGER.warn("Failed to write even after creating a new file.  Attempting to reconnect", io);
+                connectToWebHDFS();
+                resetFile();
+                try {
+                    this.currentWriter.write(line);
+                } catch (IOException ex) {
+                    LOGGER.error("Failed to write to HDFS after reconnecting client", ex);
+                    throw new RuntimeException(e);
+                }
+            }
+
+        }
+    }
+
     public void flush() throws IOException {
         if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
             this.currentWriter.flush();
@@ -158,23 +172,24 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         }
     }
 
-    private synchronized void resetFile() throws Exception {
+    private synchronized void resetFile() {
         // this will keep it thread safe, so we don't create too many files
         if (this.fileLineCounter == 0 && this.currentWriter != null)
             return;
 
-        // if there is a current writer, we must close it first.
-        if (this.currentWriter != null) {
-            flush();
-            close();
-        }
-
-        this.fileLineCounter = 0;
-
         // Create the path for where the file is going to live.
         Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
 
         try {
+
+            // if there is a current writer, we must close it first.
+            if (this.currentWriter != null) {
+                flush();
+                close();
+            }
+
+            this.fileLineCounter = 0;
+
             // Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
             if (client.exists(filePath))
                 throw new RuntimeException("Unable to create file: " + filePath);
@@ -188,7 +203,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
         } catch (Exception e) {
             LOGGER.error("COULD NOT CreateFile: {}", filePath);
             LOGGER.error(e.getMessage());
-            throw e;
+            throw new RuntimeException(e);
         }
     }