You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by mf...@apache.org on 2014/05/30 05:05:24 UTC
[1/4] git commit: Updated formatting
Repository: incubator-streams
Updated Branches:
refs/heads/STREAMS-104 [created] d4d95ccf0
Updated formatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e67d9adb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e67d9adb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e67d9adb
Branch: refs/heads/STREAMS-104
Commit: e67d9adb050bd7fe1c28433c856797aa814ba5f2
Parents: f4749b9
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:01:22 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:01:22 2014 -0400
----------------------------------------------------------------------
.../streams/hdfs/WebHdfsPersistWriter.java | 78 +++++++++-----------
1 file changed, 33 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e67d9adb/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 12f9a5e..4c083fc 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -25,14 +25,13 @@ import java.util.Date;
import java.util.List;
import java.util.Queue;
-public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable
-{
+public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Closeable, DatumStatusCountable {
public final static String STREAMS_ID = "WebHdfsPersistWriter";
private final static Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistWriter.class);
private final static char DELIMITER = '\t';
- private final static int DEFAULT_LINES_PER_FILE = 50000;
+ private final static int DEFAULT_LINES_PER_FILE = 50000;
private FileSystem client;
private Path path;
@@ -43,10 +42,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
private int fileLineCounter = 0;
private OutputStreamWriter currentWriter = null;
- private static final int BYTES_IN_MB = 1024*1024;
- private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
- private volatile int totalByteCount = 0;
- private volatile int byteCount = 0;
+ private static final int BYTES_IN_MB = 1024 * 1024;
+ private static final int BYTES_BEFORE_FLUSH = 64 * BYTES_IN_MB;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
public boolean terminate = false;
@@ -60,21 +59,23 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
this.hdfsConfiguration = hdfsConfiguration;
}
- public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
- public boolean isConnected() { return (client != null); }
+ public URI getURI() throws URISyntaxException {
+ return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+ }
+
+ public boolean isConnected() {
+ return (client != null);
+ }
- public final synchronized FileSystem getFileSystem()
- {
+ public final synchronized FileSystem getFileSystem() {
// Check to see if we are connected.
- if(!isConnected())
+ if (!isConnected())
connectToWebHDFS();
return this.client;
}
- private synchronized void connectToWebHDFS()
- {
- try
- {
+ private synchronized void connectToWebHDFS() {
+ try {
LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
@@ -108,19 +109,16 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
return null;
}
});
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
e.printStackTrace();
}
}
-
+
@Override
public void write(StreamsDatum streamsDatum) {
- synchronized (this)
- {
+ synchronized (this) {
// Check to see if we need to reset the file that we are currently working with
if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
try {
@@ -141,7 +139,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
totalByteCount += bytesInLine;
byteCount += bytesInLine;
- if(byteCount > BYTES_BEFORE_FLUSH)
+ if (byteCount > BYTES_BEFORE_FLUSH)
try {
flush();
} catch (IOException e) {
@@ -152,24 +150,20 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
}
- public void flush() throws IOException
- {
- if(this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH)
- {
+ public void flush() throws IOException {
+ if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
this.currentWriter.flush();
byteCount = 0;
}
}
- private synchronized void resetFile() throws Exception
- {
+ private synchronized void resetFile() throws Exception {
// this will keep it thread safe, so we don't create too many files
- if(this.fileLineCounter == 0 && this.currentWriter != null)
+ if (this.fileLineCounter == 0 && this.currentWriter != null)
return;
// if there is a current writer, we must close it first.
- if (this.currentWriter != null)
- {
+ if (this.currentWriter != null) {
flush();
close();
}
@@ -179,10 +173,9 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
// Create the path for where the file is going to live.
Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
- try
- {
+ try {
// Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
- if(client.exists(filePath))
+ if (client.exists(filePath))
throw new RuntimeException("Unable to create file: " + filePath);
this.currentWriter = new OutputStreamWriter(client.create(filePath));
@@ -191,19 +184,15 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
writtenFiles.add(filePath);
LOGGER.info("File Created: {}", filePath);
- }
- catch (Exception e)
- {
+ } catch (Exception e) {
LOGGER.error("COULD NOT CreateFile: {}", filePath);
LOGGER.error(e.getMessage());
throw e;
}
}
- public synchronized void close() throws IOException
- {
- if(this.currentWriter != null)
- {
+ public synchronized void close() throws IOException {
+ if (this.currentWriter != null) {
this.currentWriter.flush();
this.currentWriter.close();
this.currentWriter = null;
@@ -211,8 +200,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
}
- private String convertResultToString(StreamsDatum entry)
- {
+ private String convertResultToString(StreamsDatum entry) {
String metadata = null;
try {
metadata = mapper.writeValueAsString(entry.getMetadata());
@@ -227,7 +215,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
e.printStackTrace();
}
- if(Strings.isNullOrEmpty(documentJson))
+ if (Strings.isNullOrEmpty(documentJson))
return null;
else
return new StringBuilder()
[2/4] git commit: Replaced e.printStackTrace() with log statements
Posted by mf...@apache.org.
Replaced e.printStackTrace() with log statements
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/cb0c8797
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/cb0c8797
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/cb0c8797
Branch: refs/heads/STREAMS-104
Commit: cb0c879779ebf6140b81780ca9ef298298e9144e
Parents: e67d9ad
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:09:04 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:09:04 2014 -0400
----------------------------------------------------------------------
.../streams/hdfs/WebHdfsPersistWriter.java | 19 ++++++++++---------
1 file changed, 10 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/cb0c8797/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 4c083fc..8fae4a6 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -110,8 +110,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
});
} catch (Exception e) {
- LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
- e.printStackTrace();
+ LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again",e);
}
}
@@ -124,14 +123,16 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
resetFile();
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("Unable to reset file. Shutting down and propagating error", e);
+ this.cleanUp();
+ throw new RuntimeException(e);
}
String line = convertResultToString(streamsDatum);
try {
this.currentWriter.write(line);
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.warn("Error writing to HDFS. Attempting to try a new file", e);
}
int bytesInLine = line.getBytes().length;
@@ -143,7 +144,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
flush();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error flushing to HDFS", e);
}
this.fileLineCounter++;
@@ -205,14 +206,14 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
metadata = mapper.writeValueAsString(entry.getMetadata());
} catch (JsonProcessingException e) {
- e.printStackTrace();
+ LOGGER.warn("Error converting metadata to a string", e);
}
String documentJson = null;
try {
documentJson = mapper.writeValueAsString(entry.getDocument());
} catch (JsonProcessingException e) {
- e.printStackTrace();
+ LOGGER.warn("Error converting document to string", e);
}
if (Strings.isNullOrEmpty(documentJson))
@@ -241,12 +242,12 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
flush();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error flushing on cleanup", e);
}
try {
close();
} catch (IOException e) {
- e.printStackTrace();
+ LOGGER.error("Error closing on cleanup", e);
}
}
[4/4] git commit: adjusted error message
Posted by mf...@apache.org.
adjusted error message
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/d4d95ccf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/d4d95ccf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/d4d95ccf
Branch: refs/heads/STREAMS-104
Commit: d4d95ccf0b68f010ecfa4d5c978317239a1a1408
Parents: 5c15bd8
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 23:05:00 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 23:05:00 2014 -0400
----------------------------------------------------------------------
.../main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/d4d95ccf/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index d0164f5..162461b 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -157,7 +157,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
this.currentWriter.write(line);
} catch (IOException ex) {
- LOGGER.error("Failed to write to HDFS after reconnecting client", ex);
+ LOGGER.error("Failed to write to HDFS after reconnecting client. Terminating writer.", ex);
throw new RuntimeException(e);
}
}
[3/4] git commit: Added error handler with retry
Posted by mf...@apache.org.
Added error handler with retry
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/5c15bd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/5c15bd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/5c15bd8d
Branch: refs/heads/STREAMS-104
Commit: 5c15bd8d738844ef0df30ebe89f075ddd9fb38e1
Parents: cb0c879
Author: mfranklin <mf...@apache.org>
Authored: Thu May 29 21:40:47 2014 -0400
Committer: mfranklin <mf...@apache.org>
Committed: Thu May 29 21:40:47 2014 -0400
----------------------------------------------------------------------
.../streams/hdfs/WebHdfsPersistWriter.java | 63 ++++++++++++--------
1 file changed, 39 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5c15bd8d/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 8fae4a6..d0164f5 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -110,7 +110,8 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
});
} catch (Exception e) {
- LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again",e);
+ LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again", e);
+ throw new RuntimeException(e);
}
}
@@ -120,20 +121,10 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
synchronized (this) {
// Check to see if we need to reset the file that we are currently working with
if (this.currentWriter == null || (this.fileLineCounter > this.linesPerFile))
- try {
- resetFile();
- } catch (Exception e) {
- LOGGER.error("Unable to reset file. Shutting down and propagating error", e);
- this.cleanUp();
- throw new RuntimeException(e);
- }
+ resetFile();
String line = convertResultToString(streamsDatum);
- try {
- this.currentWriter.write(line);
- } catch (IOException e) {
- LOGGER.warn("Error writing to HDFS. Attempting to try a new file", e);
- }
+ writeInternal(line);
int bytesInLine = line.getBytes().length;
totalRecordsWritten++;
@@ -144,13 +135,36 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
try {
flush();
} catch (IOException e) {
- LOGGER.error("Error flushing to HDFS", e);
+ LOGGER.warn("Error flushing to HDFS. Creating a new file and continuing execution. WARNING: There could be data loss.", e);
}
this.fileLineCounter++;
}
}
+ private void writeInternal(String line) {
+ try {
+ this.currentWriter.write(line);
+ } catch (IOException e) {
+ LOGGER.warn("Error writing to HDFS. Attempting to try a new file", e);
+ try{
+ resetFile();
+ this.currentWriter.write(line);
+ } catch (IOException io) {
+ LOGGER.warn("Failed to write even after creating a new file. Attempting to reconnect", io);
+ connectToWebHDFS();
+ resetFile();
+ try {
+ this.currentWriter.write(line);
+ } catch (IOException ex) {
+ LOGGER.error("Failed to write to HDFS after reconnecting client", ex);
+ throw new RuntimeException(e);
+ }
+ }
+
+ }
+ }
+
public void flush() throws IOException {
if (this.currentWriter != null && byteCount > BYTES_BEFORE_FLUSH) {
this.currentWriter.flush();
@@ -158,23 +172,24 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
}
- private synchronized void resetFile() throws Exception {
+ private synchronized void resetFile() {
// this will keep it thread safe, so we don't create too many files
if (this.fileLineCounter == 0 && this.currentWriter != null)
return;
- // if there is a current writer, we must close it first.
- if (this.currentWriter != null) {
- flush();
- close();
- }
-
- this.fileLineCounter = 0;
-
// Create the path for where the file is going to live.
Path filePath = this.path.suffix("/" + hdfsConfiguration.getWriterFilePrefix() + "-" + new Date().getTime() + ".tsv");
try {
+
+ // if there is a current writer, we must close it first.
+ if (this.currentWriter != null) {
+ flush();
+ close();
+ }
+
+ this.fileLineCounter = 0;
+
// Check to see if a file of the same name exists, if it does, then we are not going to be able to proceed.
if (client.exists(filePath))
throw new RuntimeException("Unable to create file: " + filePath);
@@ -188,7 +203,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
} catch (Exception e) {
LOGGER.error("COULD NOT CreateFile: {}", filePath);
LOGGER.error(e.getMessage());
- throw e;
+ throw new RuntimeException(e);
}
}