You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/07 21:53:54 UTC
[1/3] git commit: added support for file: and hdfs: schemes reader
and writer are not internally consistent - removing timestamp read from wrong
field for testing "scheme": "file" works :) will submit another PR for
STREAMS-167 which will allow timestamp
Repository: incubator-streams
Updated Branches:
refs/heads/master 764247592 -> 675ae89c6
added support for file: and hdfs: schemes
reader and writer are not internally consistent - removing timestamp read from wrong field for testing
"scheme": "file" works :)
will submit another PR for STREAMS-167 which will allow timestamps and metadata reading from a previously written hdfs stream.
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6016a774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6016a774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6016a774
Branch: refs/heads/master
Commit: 6016a7740ae52b14ed15647efa530c365d789e8d
Parents: 35a8fbf
Author: sblackmon <sb...@apache.org>
Authored: Sat Sep 13 21:56:42 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Sat Sep 13 21:56:42 2014 -0500
----------------------------------------------------------------------
.../org/apache/streams/hdfs/WebHdfsPersistReader.java | 13 ++++++++++++-
.../apache/streams/hdfs/WebHdfsPersistReaderTask.java | 5 ++++-
.../org/apache/streams/hdfs/WebHdfsPersistWriter.java | 9 ++++++++-
.../org/apache/streams/hdfs/HdfsConfiguration.json | 6 ++++++
4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 1baddc3..c0213f9 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -19,6 +19,7 @@
package org.apache.streams.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
import com.google.common.collect.Queues;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -74,7 +75,17 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
this.hdfsConfiguration = hdfsConfiguration;
}
- public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
+ public URI getURI() throws URISyntaxException {
+ StringBuilder uriBuilder = new StringBuilder();
+ uriBuilder.append(hdfsConfiguration.getScheme());
+ uriBuilder.append("://");
+ if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
+ uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+ else
+ uriBuilder.append("/");
+ return new URI(uriBuilder.toString());
+ }
+
public boolean isConnected() { return (client != null); }
public final synchronized FileSystem getFileSystem()
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index da0cc48..72d5581 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -62,7 +62,10 @@ public class WebHdfsPersistReaderTask implements Runnable {
if( !Strings.isNullOrEmpty(line) ) {
reader.countersCurrent.incrementAttempt();
String[] fields = line.split(Character.toString(reader.DELIMITER));
- StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
+ // Temporarily disabling timestamp reads to make reader and writer compatible
+ // This capability will be restore in PR for STREAMS-169
+ //StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
+ StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
write( entry );
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
}
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 3ab3f29..b016cbe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -78,7 +78,14 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
}
public URI getURI() throws URISyntaxException {
- return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+ StringBuilder uriBuilder = new StringBuilder();
+ uriBuilder.append(hdfsConfiguration.getScheme());
+ uriBuilder.append("://");
+ if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
+ uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+ else
+ uriBuilder.append("/");
+ return new URI(uriBuilder.toString());
}
public boolean isConnected() {
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index d8f2c38..e33994b 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -5,6 +5,12 @@
"javaType" : "org.apache.streams.hdfs.HdfsConfiguration",
"javaInterfaces": ["java.io.Serializable"],
"properties": {
+ "scheme": {
+ "type": "string",
+ "description": "scheme",
+ "enum" : ["file", "hdfs", "webhdfs"],
+ "default": "webhdfs"
+ },
"host": {
"type": "string",
"description": "WebHdfs host"
[3/3] git commit: Merge commit
'dc68334029dba329b0efa2a60d44820d4b9ed907'
Posted by sb...@apache.org.
Merge commit 'dc68334029dba329b0efa2a60d44820d4b9ed907'
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/675ae89c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/675ae89c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/675ae89c
Branch: refs/heads/master
Commit: 675ae89c6bcbd1b246850a4d3f3840c7b9095e16
Parents: 7642475 dc68334
Author: sblackmon <sb...@apache.org>
Authored: Tue Oct 7 14:25:44 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Oct 7 14:25:44 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-persist-hdfs/pom.xml | 12 ++
.../streams/hdfs/WebHdfsPersistReader.java | 13 +-
.../streams/hdfs/WebHdfsPersistReaderTask.java | 5 +-
.../streams/hdfs/WebHdfsPersistWriter.java | 9 +-
.../apache/streams/hdfs/HdfsConfiguration.json | 6 +
.../hdfs/test/HdfsPersistConfigTest.java | 169 +++++++++++++++++++
6 files changed, 211 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/675ae89c/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/675ae89c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
[2/3] git commit: added test cases as discussed:
https://github.com/apache/incubator-streams/pull/85
Posted by sb...@apache.org.
added test cases as discussed:
https://github.com/apache/incubator-streams/pull/85
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc683340
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc683340
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc683340
Branch: refs/heads/master
Commit: dc68334029dba329b0efa2a60d44820d4b9ed907
Parents: 6016a77
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Oct 7 07:21:02 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Oct 7 07:21:02 2014 -0500
----------------------------------------------------------------------
streams-contrib/streams-persist-hdfs/pom.xml | 12 ++
.../hdfs/test/HdfsPersistConfigTest.java | 169 +++++++++++++++++++
2 files changed, 181 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc683340/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 5fe33b3..8148cd0 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -66,6 +66,18 @@
</dependency>
</dependencies>
<build>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc683340/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
new file mode 100644
index 0000000..819414a
--- /dev/null
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.hdfs.test;
+
+import org.apache.streams.hdfs.*;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test for checking that strings append to FS paths as expected
+ */
+public class HdfsPersistConfigTest {
+
+ private final static Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
+
+ @Test
+ public void getWriterFileUriTest()
+ {
+ HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+ writerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE);
+ writerConfiguration.setPath("path");
+ writerConfiguration.setWriterPath("writerPath");
+ writerConfiguration.setUser("cloudera");
+
+ WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistWriter.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray());
+ webHdfsPersistWriter.prepare(null);
+ assertTrue(webHdfsPersistWriter.isConnected());
+ }
+
+ @Test
+ public void getWriterHdfsUriTest()
+ {
+ HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+ writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
+ writerConfiguration.setHost("localhost");
+ writerConfiguration.setPort(9000l);
+ writerConfiguration.setPath("path");
+ writerConfiguration.setWriterPath("writerPath");
+ writerConfiguration.setUser("cloudera");
+
+ WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistWriter.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray());
+
+ }
+
+ @Test
+ public void getWriterWebHdfsUriTest()
+ {
+ HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+ writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
+ writerConfiguration.setHost("localhost");
+ writerConfiguration.setPort(57000l);
+ writerConfiguration.setPath("path");
+ writerConfiguration.setWriterPath("writerPath");
+ writerConfiguration.setUser("cloudera");
+
+ WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistWriter.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray());
+
+ }
+
+ @Test
+ public void getReaderFileUriTest()
+ {
+ HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+ readerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE);
+ readerConfiguration.setPath("path");
+ readerConfiguration.setReaderPath("readerPath");
+
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistReader.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray());
+ }
+
+ @Test
+ public void getReaderHdfsUriTest()
+ {
+ HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+ readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
+ readerConfiguration.setHost("localhost");
+ readerConfiguration.setPort(9000l);
+ readerConfiguration.setPath("path");
+ readerConfiguration.setReaderPath("readerPath");
+
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistReader.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray());
+
+ }
+
+ @Test
+ public void getReaderWebHdfsUriTest()
+ {
+ HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+ readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
+ readerConfiguration.setHost("localhost");
+ readerConfiguration.setPort(57000l);
+ readerConfiguration.setPath("path");
+ readerConfiguration.setReaderPath("readerPath");
+
+ WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+ String uri = null;
+ try {
+ uri = webHdfsPersistReader.getURI().toString();
+ } catch (URISyntaxException e) {
+ fail("URI Syntax");
+ }
+ assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray());
+
+ }
+
+}