You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streams.apache.org by sb...@apache.org on 2014/10/07 21:53:54 UTC

[1/3] git commit: added support for file: and hdfs: schemes reader and writer are not internally consistent - removing timestamp read from wrong field for testing "scheme": "file" works :) will submit another PR for STREAMS-167 which will allow timestamp

Repository: incubator-streams
Updated Branches:
  refs/heads/master 764247592 -> 675ae89c6


added support for file: and hdfs: schemes
reader and writer are not internally consistent - removing timestamp read from wrong field for testing
"scheme": "file" works :)
will submit another PR for STREAMS-167 which will allow timestamps and metadata reading from a previously written hdfs stream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6016a774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6016a774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6016a774

Branch: refs/heads/master
Commit: 6016a7740ae52b14ed15647efa530c365d789e8d
Parents: 35a8fbf
Author: sblackmon <sb...@apache.org>
Authored: Sat Sep 13 21:56:42 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Sat Sep 13 21:56:42 2014 -0500

----------------------------------------------------------------------
 .../org/apache/streams/hdfs/WebHdfsPersistReader.java  | 13 ++++++++++++-
 .../apache/streams/hdfs/WebHdfsPersistReaderTask.java  |  5 ++++-
 .../org/apache/streams/hdfs/WebHdfsPersistWriter.java  |  9 ++++++++-
 .../org/apache/streams/hdfs/HdfsConfiguration.json     |  6 ++++++
 4 files changed, 30 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 1baddc3..c0213f9 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -19,6 +19,7 @@
 package org.apache.streams.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -74,7 +75,17 @@ public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCo
         this.hdfsConfiguration = hdfsConfiguration;
     }
 
-    public URI getURI() throws URISyntaxException { return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort()); }
+    public URI getURI() throws URISyntaxException {
+        StringBuilder uriBuilder = new StringBuilder();
+        uriBuilder.append(hdfsConfiguration.getScheme());
+        uriBuilder.append("://");
+        if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
+            uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+        else
+            uriBuilder.append("/");
+        return new URI(uriBuilder.toString());
+    }
+
     public boolean isConnected() 		                { return (client != null); }
 
     public final synchronized FileSystem getFileSystem()

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index da0cc48..72d5581 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -62,7 +62,10 @@ public class WebHdfsPersistReaderTask implements Runnable {
                         if( !Strings.isNullOrEmpty(line) ) {
                             reader.countersCurrent.incrementAttempt();
                             String[] fields = line.split(Character.toString(reader.DELIMITER));
-                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
+                            // Temporarily disabling timestamp reads to make reader and writer compatible
+                            // This capability will be restore in PR for STREAMS-169
+                            //StreamsDatum entry = new StreamsDatum(fields[3], fields[0], new DateTime(Long.parseLong(fields[2])));
+                            StreamsDatum entry = new StreamsDatum(fields[3], fields[0]);
                             write( entry );
                             reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                         }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
index 3ab3f29..b016cbe 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
+++ b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
@@ -78,7 +78,14 @@ public class WebHdfsPersistWriter implements StreamsPersistWriter, Flushable, Cl
     }
 
     public URI getURI() throws URISyntaxException {
-        return new URI(WebHdfsFileSystem.SCHEME + "://" + hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+        StringBuilder uriBuilder = new StringBuilder();
+        uriBuilder.append(hdfsConfiguration.getScheme());
+        uriBuilder.append("://");
+        if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost()))
+            uriBuilder.append(hdfsConfiguration.getHost() + ":" + hdfsConfiguration.getPort());
+        else
+            uriBuilder.append("/");
+        return new URI(uriBuilder.toString());
     }
 
     public boolean isConnected() {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6016a774/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
index d8f2c38..e33994b 100644
--- a/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
+++ b/streams-contrib/streams-persist-hdfs/src/main/jsonschema/org/apache/streams/hdfs/HdfsConfiguration.json
@@ -5,6 +5,12 @@
     "javaType" : "org.apache.streams.hdfs.HdfsConfiguration",
     "javaInterfaces": ["java.io.Serializable"],
     "properties": {
+        "scheme": {
+            "type": "string",
+            "description": "scheme",
+            "enum" : ["file", "hdfs", "webhdfs"],
+            "default": "webhdfs"
+        },
         "host": {
             "type": "string",
             "description": "WebHdfs host"


[3/3] git commit: Merge commit 'dc68334029dba329b0efa2a60d44820d4b9ed907'

Posted by sb...@apache.org.
Merge commit 'dc68334029dba329b0efa2a60d44820d4b9ed907'


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/675ae89c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/675ae89c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/675ae89c

Branch: refs/heads/master
Commit: 675ae89c6bcbd1b246850a4d3f3840c7b9095e16
Parents: 7642475 dc68334
Author: sblackmon <sb...@apache.org>
Authored: Tue Oct 7 14:25:44 2014 -0500
Committer: sblackmon <sb...@apache.org>
Committed: Tue Oct 7 14:25:44 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-hdfs/pom.xml    |  12 ++
 .../streams/hdfs/WebHdfsPersistReader.java      |  13 +-
 .../streams/hdfs/WebHdfsPersistReaderTask.java  |   5 +-
 .../streams/hdfs/WebHdfsPersistWriter.java      |   9 +-
 .../apache/streams/hdfs/HdfsConfiguration.json  |   6 +
 .../hdfs/test/HdfsPersistConfigTest.java        | 169 +++++++++++++++++++
 6 files changed, 211 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/675ae89c/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/675ae89c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistWriter.java
----------------------------------------------------------------------


[2/3] git commit: added test cases as discussed: https://github.com/apache/incubator-streams/pull/85

Posted by sb...@apache.org.
added test cases as discussed:
https://github.com/apache/incubator-streams/pull/85


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/dc683340
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/dc683340
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/dc683340

Branch: refs/heads/master
Commit: dc68334029dba329b0efa2a60d44820d4b9ed907
Parents: 6016a77
Author: Steve Blackmon <sb...@w2odigital.com>
Authored: Tue Oct 7 07:21:02 2014 -0500
Committer: Steve Blackmon <sb...@w2odigital.com>
Committed: Tue Oct 7 07:21:02 2014 -0500

----------------------------------------------------------------------
 streams-contrib/streams-persist-hdfs/pom.xml    |  12 ++
 .../hdfs/test/HdfsPersistConfigTest.java        | 169 +++++++++++++++++++
 2 files changed, 181 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc683340/streams-contrib/streams-persist-hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/pom.xml b/streams-contrib/streams-persist-hdfs/pom.xml
index 5fe33b3..8148cd0 100644
--- a/streams-contrib/streams-persist-hdfs/pom.xml
+++ b/streams-contrib/streams-persist-hdfs/pom.xml
@@ -66,6 +66,18 @@
         </dependency>
     </dependencies>
     <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
         <plugins>
             <plugin>
                 <groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/dc683340/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
----------------------------------------------------------------------
diff --git a/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
new file mode 100644
index 0000000..819414a
--- /dev/null
+++ b/streams-contrib/streams-persist-hdfs/src/test/java/org/apache/streams/hdfs/test/HdfsPersistConfigTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.streams.hdfs.test;
+
+import org.apache.streams.hdfs.*;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URISyntaxException;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test for checking that strings append to FS paths as expected
+ */
+public class HdfsPersistConfigTest {
+
+    private final static Logger LOGGER = LoggerFactory.getLogger(HdfsPersistConfigTest.class);
+
+    @Test
+    public void getWriterFileUriTest()
+    {
+        HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+        writerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE);
+        writerConfiguration.setPath("path");
+        writerConfiguration.setWriterPath("writerPath");
+        writerConfiguration.setUser("cloudera");
+
+        WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistWriter.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray());
+        webHdfsPersistWriter.prepare(null);
+        assertTrue(webHdfsPersistWriter.isConnected());
+    }
+
+    @Test
+    public void getWriterHdfsUriTest()
+    {
+        HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+        writerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
+        writerConfiguration.setHost("localhost");
+        writerConfiguration.setPort(9000l);
+        writerConfiguration.setPath("path");
+        writerConfiguration.setWriterPath("writerPath");
+        writerConfiguration.setUser("cloudera");
+
+        WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistWriter.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray());
+
+    }
+
+    @Test
+    public void getWriterWebHdfsUriTest()
+    {
+        HdfsWriterConfiguration writerConfiguration = new HdfsWriterConfiguration();
+        writerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
+        writerConfiguration.setHost("localhost");
+        writerConfiguration.setPort(57000l);
+        writerConfiguration.setPath("path");
+        writerConfiguration.setWriterPath("writerPath");
+        writerConfiguration.setUser("cloudera");
+
+        WebHdfsPersistWriter webHdfsPersistWriter = new WebHdfsPersistWriter(writerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistWriter.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray());
+
+    }
+
+    @Test
+    public void getReaderFileUriTest()
+    {
+        HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+        readerConfiguration.setScheme(HdfsConfiguration.Scheme.FILE);
+        readerConfiguration.setPath("path");
+        readerConfiguration.setReaderPath("readerPath");
+
+        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistReader.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("file:///").toCharArray());
+    }
+
+    @Test
+    public void getReaderHdfsUriTest()
+    {
+        HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+        readerConfiguration.setScheme(HdfsConfiguration.Scheme.HDFS);
+        readerConfiguration.setHost("localhost");
+        readerConfiguration.setPort(9000l);
+        readerConfiguration.setPath("path");
+        readerConfiguration.setReaderPath("readerPath");
+
+        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistReader.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("hdfs://localhost:9000").toCharArray());
+
+    }
+
+    @Test
+    public void getReaderWebHdfsUriTest()
+    {
+        HdfsReaderConfiguration readerConfiguration = new HdfsReaderConfiguration();
+        readerConfiguration.setScheme(HdfsConfiguration.Scheme.WEBHDFS);
+        readerConfiguration.setHost("localhost");
+        readerConfiguration.setPort(57000l);
+        readerConfiguration.setPath("path");
+        readerConfiguration.setReaderPath("readerPath");
+
+        WebHdfsPersistReader webHdfsPersistReader = new WebHdfsPersistReader(readerConfiguration);
+
+        String uri = null;
+        try {
+            uri = webHdfsPersistReader.getURI().toString();
+        } catch (URISyntaxException e) {
+            fail("URI Syntax");
+        }
+        assertArrayEquals(uri.toCharArray(), ("webhdfs://localhost:57000").toCharArray());
+
+    }
+
+}