You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/07 21:24:36 UTC
[1/2] git commit: Made linesPerFile configurable Made configurator
more sensible
Repository: incubator-streams
Updated Branches:
refs/heads/master 178869164 -> 764247592
Made linesPerFile configurable
Made configurator more sensible
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/669194f5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/669194f5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/669194f5
Branch: refs/heads/master
Commit: 669194f5cd8d59f518d7cdbcab5b66cee9fe56b0
Parents: 3ee5175
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Sun Sep 7 14:13:07 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Sun Sep 7 14:13:07 2014 -0500
----------------------------------------------------------------------
.../apache/streams/hdfs/HdfsConfigurator.java | 38 ++++++--------------
.../streams/hdfs/WebHdfsPersistWriter.java | 5 +--
.../streams/hdfs/HdfsWriterConfiguration.json | 5 +++
3 files changed, 18 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
index b000b85..c4823c3 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
@@ -20,6 +20,7 @@ package org.apache.streams.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigRenderOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,45 +34,26 @@ public class HdfsConfigurator {
private final static ObjectMapper mapper = new ObjectMapper();
public static HdfsConfiguration detectConfiguration(Config hdfs) {
- String host = hdfs.getString("host");
- Long port = hdfs.getLong("port");
- String path = hdfs.getString("path");
- String user = hdfs.getString("user");
- String password = hdfs.getString("password");
- HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
-
- hdfsConfiguration.setHost(host);
- hdfsConfiguration.setPort(port);
- hdfsConfiguration.setPath(path);
- hdfsConfiguration.setUser(user);
- hdfsConfiguration.setPassword(password);
+ HdfsConfiguration hdfsConfiguration = null;
+ try {
+ hdfsConfiguration = mapper.readValue(hdfs.root().render(ConfigRenderOptions.concise()), HdfsConfiguration.class);
+ } catch (Exception e) {
+ e.printStackTrace();
+ LOGGER.warn("Could not parse HdfsConfiguration");
+ }
return hdfsConfiguration;
}
public static HdfsReaderConfiguration detectReaderConfiguration(Config hdfs) {
- HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
- HdfsReaderConfiguration hdfsReaderConfiguration = mapper.convertValue(hdfsConfiguration, HdfsReaderConfiguration.class);
-
- String readerPath = hdfs.getString("readerPath");
-
- hdfsReaderConfiguration.setReaderPath(readerPath);
-
- return hdfsReaderConfiguration;
+ return mapper.convertValue(detectConfiguration(hdfs), HdfsReaderConfiguration.class);
}
public static HdfsWriterConfiguration detectWriterConfiguration(Config hdfs) {
- HdfsConfiguration hdfsConfiguration = detectConfiguration(hdfs);
- HdfsWriterConfiguration hdfsWriterConfiguration = mapper.convertValue(hdfsConfiguration, HdfsWriterConfiguration.class);
-
- String writerPath = hdfs.getString("writerPath");
-
- hdfsWriterConfiguration.setWriterPath(writerPath);
-
- return hdfsWriterConfiguration;
+ return mapper.convertValue(detectConfiguration(hdfs), HdfsWriterConfiguration.class);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 3ab3f29..4b480ad 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -54,7 +54,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
private FileSystem client;
private Path path;
private String filePart = "default";
- private int linesPerFile = 1000;
+ private int linesPerFile;
private int totalRecordsWritten = 0;
private final List<Path> writtenFiles = new ArrayList<Path>();
private int fileLineCounter = 0;
@@ -75,6 +75,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
public WebHdfsPersistWriter(HdfsWriterConfiguration hdfsConfiguration) {
this.hdfsConfiguration = hdfsConfiguration;
+ this.linesPerFile = hdfsConfiguration.getLinesPerFile().intValue();
}
public URI getURI() throws URISyntaxException {
@@ -253,7 +254,7 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
return null;
else
return new StringBuilder()
- .append(entry.getSequenceid())
+ .append(entry.getId())
.append(DELIMITER)
.append(entry.getTimestamp())
.append(DELIMITER)
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/669194f5/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
index 5cadd7d..fedda38 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsWriterConfiguration.json
@@ -13,6 +13,11 @@
"writerFilePrefix": {
"type": "string",
"description": "File Prefix"
+ },
+ "linesPerFile": {
+ "type": "integer",
+ "description": "Lines Per File",
+ "default": 1000
}
}
}
\ No newline at end of file
[2/2] git commit: Merge branch 'STREAMS-163'
Posted by sb...@apache.org.
Merge branch 'STREAMS-163'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/76424759
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/76424759
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/76424759
Branch: refs/heads/master
Commit: 764247592fbe5f3189eff9b06949c54cb68c2f06
Parents: 1788691 669194f
Author: sblackmon <sb...@apache.org>
Authored: Tue Oct 7 13:59:53 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Oct 7 13:59:53 2014 -0500
----------------------------------------------------------------------
.../apache/streams/hdfs/HdfsConfigurator.java | 38 ++++++--------------
.../streams/hdfs/WebHdfsPersistWriter.java | 5 +--
.../streams/hdfs/HdfsWriterConfiguration.json | 5 +++
3 files changed, 18 insertions(+), 30 deletions(-)
----------------------------------------------------------------------