You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/06 18:06:35 UTC

[incubator-pulsar] branch master updated: Added HDFS Sink (#2409)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 261eab1  Added HDFS Sink (#2409)
261eab1 is described below

commit 261eab12789c8c06b64cb7a5688ab0c867d609b1
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Thu Sep 6 11:06:33 2018 -0700

    Added HDFS Sink (#2409)
    
    * Added HDFS Sink
    
    * Fixed issues identified during PR review
    
    * Fixed comment
    
    * Added HDFS Container to externalServices
    
    * Ignoring HdfsSink test for now
    
    * Removed HDFS Container to externalServices
    
    * Fixed ASL licensing
    
    * Fixed compile errors
    
    * Added HDFS to SinkType Enum
---
 pom.xml                                            |  50 +++-
 pulsar-io/hdfs/pom.xml                             |  69 ++++++
 .../apache/pulsar/io/hdfs/AbstractHdfsConfig.java  |  86 +++++++
 .../pulsar/io/hdfs/AbstractHdfsConnector.java      | 258 +++++++++++++++++++++
 .../org/apache/pulsar/io/hdfs/Compression.java     |  26 +++
 .../org/apache/pulsar/io/hdfs/HdfsResources.java   |  51 ++++
 .../org/apache/pulsar/io/hdfs/SecurityUtil.java    |  91 ++++++++
 .../org/apache/pulsar/io/hdfs/package-info.java    |  19 ++
 .../pulsar/io/hdfs/sink/HdfsAbstractSink.java      | 121 ++++++++++
 .../apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java | 108 +++++++++
 .../apache/pulsar/io/hdfs/sink/HdfsSyncThread.java |  80 +++++++
 .../apache/pulsar/io/hdfs/sink/package-info.java   |  19 ++
 .../sink/seq/HdfsAbstractSequenceFileSink.java     |  96 ++++++++
 .../io/hdfs/sink/seq/HdfsSequentialTextSink.java   |  71 ++++++
 .../pulsar/io/hdfs/sink/seq/HdfsTextSink.java      |  54 +++++
 .../pulsar/io/hdfs/sink/seq/package-info.java      |  19 ++
 .../hdfs/sink/text/HdfsAbstractTextFileSink.java   |  79 +++++++
 .../pulsar/io/hdfs/sink/text/HdfsStringSink.java   |  34 +++
 .../pulsar/io/hdfs/sink/text/package-info.java     |  19 ++
 .../resources/META-INF/services/pulsar-io.yaml     |  22 ++
 .../pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java  | 119 ++++++++++
 .../pulsar/io/hdfs/sink/HdfsSinkConfigTests.java   | 154 ++++++++++++
 .../io/hdfs/sink/seq/HdfsSequentialSinkTests.java  |  94 ++++++++
 .../pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java | 104 +++++++++
 .../io/hdfs/sink/text/HdfsStringSinkTests.java     | 103 ++++++++
 .../src/test/resources/hadoop/core-site.xml}       |  38 +--
 .../src/test/resources/hadoop/hdfs-site.xml}       |  40 ++--
 pulsar-io/hdfs/src/test/resources/sinkConfig.yaml  |  25 ++
 pulsar-io/pom.xml                                  |   1 +
 .../integration/containers/HdfsContainer.java      |  51 ++++
 .../integration/functions/PulsarFunctionsTest.java |   7 +-
 .../tests/integration/io/HdfsSinkTester.java       |  65 ++++++
 .../pulsar/tests/integration/io/SinkTester.java    |   3 +-
 .../tests/integration/suites/PulsarTestSuite.java  |   1 +
 34 files changed, 2117 insertions(+), 60 deletions(-)

diff --git a/pom.xml b/pom.xml
index 2352d5e..b349925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,12 +496,6 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
-        <groupId>commons-io</groupId>
-        <artifactId>commons-io</artifactId>
-        <version>2.5</version>
-      </dependency>
-
-      <dependency>
         <groupId>commons-codec</groupId>
         <artifactId>commons-codec</artifactId>
         <version>1.10</version>
@@ -1350,6 +1344,50 @@ flexible messaging model and an intuitive client API.</description>
     <profile>
       <id>docker</id>
     </profile>
+    
+    <profile>
+      <!-- Checks style and licensing requirements. This is a good
+           idea to run for contributions and for the release process. While it would
+           be nice to run always these plugins can considerably slow the build and have
+           proven to create unstable builds in our multi-module project and when building
+           using multiple threads. The stability issues seen with Checkstyle in multi-module
+           builds include false-positives and false negatives. -->
+      <id>contrib-check</id>
+         <build>
+            <plugins>
+                <plugin>
+                    <groupId>org.apache.rat</groupId>
+                    <artifactId>apache-rat-plugin</artifactId>
+                    <executions>
+                      <execution>
+                          <goals>
+                              <goal>check</goal>
+                          </goals>
+                          <phase>verify</phase>
+                      </execution>
+                   </executions>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-checkstyle-plugin</artifactId>
+                    <executions>
+                       <execution>
+                          <id>check-style</id>
+                          <phase>verify</phase>
+                          <configuration>
+         					  <configLocation>./buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+         					  <suppressionsLocation>/buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+                              <encoding>UTF-8</encoding>
+                          </configuration>
+                          <goals>
+                             <goal>check</goal>
+                          </goals>
+                       </execution>
+                    </executions>
+                </plugin>
+             </plugins>
+         </build>
+    </profile>
   </profiles>
 
   <repositories>
diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs/pom.xml
new file mode 100644
index 0000000..0d55207
--- /dev/null
+++ b/pulsar-io/hdfs/pom.xml
@@ -0,0 +1,69 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-io</artifactId>
+    <version>2.2.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>pulsar-io-hdfs</artifactId>
+  
+  <dependencies>
+     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-io-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+    </dependency>
+    
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-client</artifactId>
+  		<version>3.1.1</version>
+  	</dependency>
+
+  	<dependency>
+  		<groupId>org.testng</groupId>
+  		<artifactId>testng</artifactId>
+  		<scope>test</scope>
+  	</dependency>
+  </dependencies>
+  
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-maven-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  
+</project>
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
new file mode 100644
index 0000000..529c350
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A file or comma separated list of files which contains the Hadoop file system configuration,
+     * e.g. 'core-site.xml', 'hdfs-site.xml'.
+     */
+    private String hdfsConfigResources;
+
+    /**
+     * The HDFS directory from which files should be read from or written to.
+     */
+    private String directory;
+
+    /**
+     * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+     */
+    private String encoding;
+
+    /**
+     * The compression codec used to compress/de-compress the files on HDFS.
+     */
+    private Compression compression;
+
+    /**
+     * The Kerberos user principal account to use for authentication.
+     */
+    private String kerberosUserPrincipal;
+
+    /**
+     * The full pathname to the Kerberos keytab file to use for authentication.
+     */
+    private String keytab;
+
+    public void validate() {
+        if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) {
+           throw new IllegalArgumentException("Required property not set.");
+        }
+
+        if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab))
+            || (StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab))) {
+          throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required.");
+        }
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
new file mode 100644
index 0000000..0eccd93
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
+
+/**
+ * A Simple abstract class for HDFS connectors.
+ * Provides methods for connecting to HDFS
+ */
+public abstract class AbstractHdfsConnector {
+
+    private static final Object RESOURCES_LOCK = new Object();
+
+    // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
+    protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
+    protected AbstractHdfsConfig connectorConfig;
+    protected CompressionCodecFactory compressionCodecFactory;
+
+    public AbstractHdfsConnector() {
+       hdfsResources.set(new HdfsResources(null, null, null));
+    }
+
+    /*
+     * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
+     */
+    protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
+        Configuration config = new ExtendedConfiguration();
+        config.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+        getConfig(config, connectorConfig.getHdfsConfigResources());
+
+        // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
+        checkHdfsUriForTimeout(config);
+
+        /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
+         * the processor without a complete restart
+         */
+        String disableCacheName = String.format("fs.%s.impl.disable.cache",
+                FileSystem.getDefaultUri(config).getScheme());
+        config.set(disableCacheName, "true");
+
+        // If kerberos is enabled, create the file system as the kerberos principal
+        // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
+        FileSystem fs;
+        UserGroupInformation ugi;
+        synchronized (RESOURCES_LOCK) {
+            if (SecurityUtil.isSecurityEnabled(config)) {
+                ugi = SecurityUtil.loginKerberos(config,
+                        connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
+                fs = getFileSystemAsUser(config, ugi);
+            } else {
+                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
+                config.set("hadoop.security.authentication", "simple");
+                ugi = SecurityUtil.loginSimple(config);
+                fs = getFileSystemAsUser(config, ugi);
+            }
+        }
+        return new HdfsResources(config, fs, ugi);
+    }
+
+    private static Configuration getConfig(final Configuration config, String res) throws IOException {
+        boolean foundResources = false;
+        if (null != res) {
+            String[] resources = res.split(",");
+            for (String resource : resources) {
+                config.addResource(new Path(resource.trim()));
+                foundResources = true;
+            }
+        }
+
+        if (!foundResources) {
+            // check that at least 1 non-default resource is available on the classpath
+            String configStr = config.toString();
+            for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
+                if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
+                    foundResources = true;
+                    break;
+                }
+            }
+        }
+
+        if (!foundResources) {
+            throw new IOException("Could not find any of the " + res + " on the classpath");
+        }
+        return config;
+    }
+
+    /*
+     * Reduce the timeout of a socket connection from the default in FileSystem.get()
+     */
+    protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
+        URI hdfsUri = FileSystem.getDefaultUri(config);
+        String address = hdfsUri.getAuthority();
+        int port = hdfsUri.getPort();
+        if (address == null || address.isEmpty() || port < 0) {
+            return;
+        }
+        InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
+        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
+        Socket socket = null;
+        try {
+            socket = socketFactory.createSocket();
+            NetUtils.connect(socket, namenode, 1000); // 1 second timeout
+        } finally {
+            IOUtils.closeQuietly(socket);
+        }
+    }
+
+    /**
+     * This exists in order to allow unit tests to override it so that they don't take several
+     * minutes waiting for UDP packets to be received.
+     *
+     * @param config
+     *            the configuration to use
+     * @return the FileSystem that is created for the given Configuration
+     * @throws IOException
+     *             if unable to create the FileSystem
+     */
+    protected FileSystem getFileSystem(final Configuration config) throws IOException {
+        return FileSystem.get(config);
+    }
+
+    protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
+        try {
+            return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+                @Override
+                public FileSystem run() throws Exception {
+                    return FileSystem.get(config);
+                }
+            });
+        } catch (InterruptedException e) {
+            throw new IOException("Unable to create file system: " + e.getMessage());
+        }
+    }
+
+    protected Configuration getConfiguration() {
+        return hdfsResources.get().getConfiguration();
+    }
+
+    protected FileSystem getFileSystem() {
+        return hdfsResources.get().getFileSystem();
+    }
+
+    protected UserGroupInformation getUserGroupInformation() {
+        return hdfsResources.get().getUserGroupInformation();
+    }
+
+    protected String getEncoding() {
+        return StringUtils.isNotBlank(connectorConfig.getEncoding())
+                   ? connectorConfig.getEncoding() : Charset.defaultCharset().name();
+    }
+
+    protected CompressionCodec getCompressionCodec() {
+       if (connectorConfig.getCompression() == null) {
+           return null;
+       }
+
+       CompressionCodec codec = getCompressionCodecFactory()
+               .getCodecByName(connectorConfig.getCompression().name());
+
+       return (codec != null) ? codec : new DefaultCodec();
+    }
+
+    protected CompressionCodecFactory getCompressionCodecFactory() {
+        if (compressionCodecFactory == null) {
+            compressionCodecFactory = new CompressionCodecFactory(getConfiguration());
+        }
+
+        return compressionCodecFactory;
+    }
+
+    /**
+     * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
+     * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
+     * something that was previously not found, but might now be available.
+     * Reference the original getClassByNameOrNull from Configuration.
+     */
+    static class ExtendedConfiguration extends Configuration {
+
+        private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>();
+
+        @Override
+        public Class<?> getClassByNameOrNull(String name) {
+            final ClassLoader classLoader = getClassLoader();
+
+            Map<String, WeakReference<Class<?>>> map;
+            synchronized (cacheClasses) {
+                map = cacheClasses.get(classLoader);
+                if (map == null) {
+                    map = Collections.synchronizedMap(new WeakHashMap<>());
+                    cacheClasses.put(classLoader, map);
+                }
+            }
+
+            Class<?> clazz = null;
+            WeakReference<Class<?>> ref = map.get(name);
+            if (ref != null) {
+                clazz = ref.get();
+            }
+
+            if (clazz == null) {
+                try {
+                    clazz = Class.forName(name, true, classLoader);
+                } catch (ClassNotFoundException e) {
+                    return null;
+                }
+                // two putters can race here, but they'll put the same class
+                map.put(name, new WeakReference<>(clazz));
+                return clazz;
+            } else {
+                // cache hit
+                return clazz;
+            }
+        }
+
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
new file mode 100644
index 0000000..97dba53
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+/**
+ * An enumeration of compression codecs available for HDFS.
+ */
+public enum Compression {
+    BZIP2, DEFLATE, GZIP, LZ4, SNAPPY
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
new file mode 100644
index 0000000..1d04c6c
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A wrapper class for HDFS resources.
+ */
+public class HdfsResources {
+
+    private final Configuration configuration;
+    private final FileSystem fileSystem;
+    private final UserGroupInformation userGroupInformation;
+
+    public HdfsResources(Configuration config, FileSystem fs, UserGroupInformation ugi) {
+        this.configuration = config;
+        this.fileSystem = fs;
+        this.userGroupInformation = ugi;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    public FileSystem getFileSystem() {
+        return fileSystem;
+    }
+
+    public UserGroupInformation getUserGroupInformation() {
+        return userGroupInformation;
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
new file mode 100644
index 0000000..c5462d3
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
+ * interfering with each other.
+ */
+public class SecurityUtil {
+    public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+    public static final String KERBEROS = "kerberos";
+
+    /**
+     *  Initializes UserGroupInformation with the given Configuration and performs the login for the
+     *  given principal and keytab. All logins should happen through this class to ensure other threads
+     *  are not concurrently modifying UserGroupInformation.
+     * <p/>
+     * @param config the configuration instance
+     * @param principal the principal to authenticate as
+     * @param keyTab the keytab to authenticate with
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginKerberos(final Configuration config,
+            final String principal, final String keyTab) throws IOException {
+        Validate.notNull(config);
+        Validate.notNull(principal);
+        Validate.notNull(keyTab);
+
+        UserGroupInformation.setConfiguration(config);
+        UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
+        return UserGroupInformation.getCurrentUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and
+     * returns UserGroupInformation.getLoginUser(). All logins should happen
+     * through this class to ensure other threads are not concurrently
+     * modifying UserGroupInformation.
+     *
+     * @param config the configuration instance
+     *
+     * @return the UGI for the given principal
+     *
+     * @throws IOException if login failed
+     */
+    public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException {
+        Validate.notNull(config);
+        UserGroupInformation.setConfiguration(config);
+        return UserGroupInformation.getLoginUser();
+    }
+
+    /**
+     * Initializes UserGroupInformation with the given Configuration and returns
+     * UserGroupInformation.isSecurityEnabled().
+     * All checks for isSecurityEnabled() should happen through this method.
+     *
+     * @param config the given configuration
+     *
+     * @return true if kerberos is enabled on the given configuration, false otherwise
+     *
+     */
+    public static boolean isSecurityEnabled(final Configuration config) {
+        Validate.notNull(config);
+        return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
new file mode 100644
index 0000000..4294852
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
new file mode 100644
index 0000000..18184e2
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConnector;
+import org.apache.pulsar.io.hdfs.HdfsResources;
+
+/**
+ * A Simple abstract class for HDFS sink.
+ * Users need to implement extractKeyValue function to use this sink.
+ */
+public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
+
+    protected HdfsSinkConfig hdfsSinkConfig;
+    protected BlockingQueue<Record<V>> unackedRecords;
+    protected HdfsSyncThread<V> syncThread;
+    private Path path;
+    private FSDataOutputStream hdfsStream;
+
+    public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
+    protected abstract void createWriter() throws IOException;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+       hdfsSinkConfig = HdfsSinkConfig.load(config);
+       hdfsSinkConfig.validate();
+       connectorConfig = hdfsSinkConfig;
+       unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
+       connectToHdfs();
+       createWriter();
+       launchSyncThread();
+    }
+
+    @Override
+    public void close() throws Exception {
+       syncThread.halt();
+       syncThread.join(0);
+    }
+
+    protected final void connectToHdfs() throws IOException {
+       try {
+           HdfsResources resources = hdfsResources.get();
+
+           if (resources.getConfiguration() == null) {
+               resources = this.resetHDFSResources(hdfsSinkConfig);
+               hdfsResources.set(resources);
+           }
+       } catch (IOException ex) {
+          hdfsResources.set(new HdfsResources(null, null, null));
+          throw ex;
+       }
+    }
+
+    @SuppressWarnings("rawtypes")
+    protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException {
+        Path path = getPath();
+        FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
+        FSDataOutputStreamBuilder builder = fs.exists(path) ? fs.appendFile(path) :  fs.createFile(path);
+        return builder.recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    }
+
+    protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
+        if (hdfsStream == null) {
+            hdfsStream = getOutputStreamBuilder().build();
+        }
+        return hdfsStream;
+    }
+
+    protected final Path getPath() {
+        if (path == null) {
+            String ext = "";
+            if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
+                ext = hdfsSinkConfig.getFileExtension();
+            } else if (getCompressionCodec() != null) {
+                ext = getCompressionCodec().getDefaultExtension();
+            }
+
+            path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+                    hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
+        }
+        return path;
+    }
+
+    protected final void launchSyncThread() throws IOException {
+        syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval());
+        syncThread.start();
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
new file mode 100644
index 0000000..e9f4cae
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
+
+/**
+ * Configuration object for all HDFS Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * The prefix of the files to create inside the HDFS directory, i.e. a value of "topicA"
+     * will result in files named topicA-, topicA-, etc being produced
+     */
+    private String filenamePrefix;
+
+    /**
+     * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', etc.
+     */
+    private String fileExtension;
+
+    /**
+     * The character to use to separate records in a text file. If no value is provided
+     * then the content from all of the records will be concatenated together in one continuous
+     * byte array.
+     */
+    private char separator;
+
+    /**
+     * The interval (in milliseconds) between calls to flush data to HDFS disk.
+     */
+    private long syncInterval;
+
+    /**
+     * The maximum number of records that we hold in memory before acking. Default is Integer.MAX_VALUE.
+     * Setting this value to one, results in every record being sent to disk before the record is acked,
+     * while setting it to a higher values allows us to buffer records before flushing them all to disk.
+     */
+    private int maxPendingRecords = Integer.MAX_VALUE;
+
+    public static HdfsSinkConfig load(String yamlFile) throws IOException {
+       ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+       return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
+    }
+
+    public static HdfsSinkConfig load(Map<String, Object> map) throws IOException {
+       ObjectMapper mapper = new ObjectMapper();
+       return mapper.readValue(new ObjectMapper().writeValueAsString(map), HdfsSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
+            || StringUtils.isEmpty(filenamePrefix)) {
+           throw new IllegalArgumentException("Required property not set.");
+        }
+
+        if (syncInterval < 0) {
+          throw new IllegalArgumentException("Sync Interval cannot be negative");
+        }
+
+        if (maxPendingRecords < 1) {
+          throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+        }
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
new file mode 100644
index 0000000..3c19ed8
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * A thread that runs in the background and acknowledges Records
+ * after they have been written to disk.
+ *
+ * @param <V>
+ */
+public class HdfsSyncThread<V> extends Thread {
+
+    private final Syncable stream;
+    private final BlockingQueue<Record<V>> unackedRecords;
+    private final long syncInterval;
+    private boolean keepRunning = true;
+
+    public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> unackedRecords, long syncInterval) {
+      this.stream = stream;
+      this.unackedRecords = unackedRecords;
+      this.syncInterval = syncInterval;
+    }
+
+    @Override
+    public void run() {
+       while (keepRunning) {
+         try {
+            Thread.sleep(syncInterval);
+            ackRecords();
+         } catch (InterruptedException e) {
+            return;
+         } catch (IOException e) {
+            e.printStackTrace();
+         }
+       }
+    }
+
+    public final void halt() throws IOException, InterruptedException {
+       keepRunning = false;
+       ackRecords();
+    }
+
+    private void ackRecords() throws IOException, InterruptedException {
+
+        if (CollectionUtils.isEmpty(unackedRecords)) {
+           return;
+        }
+
+        synchronized (stream) {
+          stream.hsync();
+        }
+
+        while (!unackedRecords.isEmpty()) {
+          unackedRecords.take().ack();
+        }
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
new file mode 100644
index 0000000..c6506d9
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
new file mode 100644
index 0000000..7c61c20
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HDFS Sink that writes it contents to HDFS as Sequence Files.
+ *
+ * @param <K> - The incoming Key type
+ * @param <V> - The incoming Value type
+ * @param <HdfsK> - The HDFS Key type
+ * @param <HdfsV> - The HDFS Value type
+ */
+public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
+    extends HdfsAbstractSink<K, V> implements Sink<V> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);
+
+    protected AtomicLong counter;
+    protected FSDataOutputStream hdfsStream;
+    protected Writer writer = null;
+
+    public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);
+
+    @Override
+    public void close() throws Exception {
+       writer.close();
+       super.close();
+    }
+
+    @Override
+    protected void createWriter() throws IOException {
+       writer = getWriter();
+    }
+
+    @Override
+    public void write(Record<V> record) {
+       try {
+            KeyValue<K, V> kv = extractKeyValue(record);
+            KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
+            writer.append(keyValue.getKey(), keyValue.getValue());
+            unackedRecords.put(record);
+        } catch (IOException | InterruptedException e) {
+            LOG.error("Unable to write to file " + getPath(), e);
+            record.fail();
+        }
+    }
+
+    protected Writer getWriter() throws IOException {
+        counter = new AtomicLong(0);
+        List<Option> options = getOptions();
+        return SequenceFile.createWriter(getConfiguration(),
+                options.toArray(new Option[options.size()]));
+     }
+
+    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+        List<Option> list = new ArrayList<Option>();
+        list.add(Writer.stream(getHdfsStream()));
+
+        if (getCompressionCodec() != null) {
+            list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
+        }
+        return list;
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
new file mode 100644
index 0000000..84ce09f
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * This Sink should be used when the records are originating from a sequential source,
+ * and we want to retain the record sequence.This class uses the record's sequence id as
+ * the sequence id in the HDFS Sequence File if it is available, if not a sequence id is
+ * auto-generated for each new record.
+ */
+public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> {
+
+    private AtomicLong counter;
+
+    @Override
+    public Writer getWriter() throws IOException {
+       counter = new AtomicLong(0);
+
+       return SequenceFile
+                .createWriter(
+                   getConfiguration(),
+                   getOptions().toArray(new Option[getOptions().size()]));
+    }
+
+    @Override
+    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+        List<Option> opts = super.getOptions();
+        opts.add(Writer.keyClass(LongWritable.class));
+        opts.add(Writer.valueClass(Text.class));
+        return opts;
+    }
+
+    @Override
+    public KeyValue<Long, String> extractKeyValue(Record<String> record) {
+       Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+       return new KeyValue<>(sequence, new String(record.getValue()));
+    }
+
+    @Override
+    public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
+       return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue()));
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
new file mode 100644
index 0000000..84ebc07
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * A Simple Sink class for Hdfs Sequence File.
+ */
+public class HdfsTextSink extends
+     HdfsAbstractSequenceFileSink<String, String, Text, Text> {
+
+    @Override
+    protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+        List<Option> opts = super.getOptions();
+        opts.add(Writer.keyClass(Text.class));
+        opts.add(Writer.valueClass(Text.class));
+        return opts;
+    }
+
+    @Override
+    public KeyValue<String, String> extractKeyValue(Record<String> record) {
+       String key = record.getKey().orElseGet(() -> new String(record.getValue()));
+       return new KeyValue<>(key, new String(record.getValue()));
+    }
+
+    @Override
+    public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
+       return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
new file mode 100644
index 0000000..025311a
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
new file mode 100644
index 0000000..6fb50a5
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for HDFS Sinks that writes there contents to HDFS as Text Files.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> {
+
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);
+
+    protected OutputStreamWriter writer;
+
+    @Override
+    protected void createWriter() throws IOException {
+        writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding());
+    }
+
+    @Override
+    public void close() throws Exception {
+        writer.close();
+        super.close();
+    }
+
+    @Override
+    public void write(Record<V> record) {
+       try {
+           KeyValue<K, V> kv = extractKeyValue(record);
+           writer.write(kv.getValue().toString());
+
+           if (hdfsSinkConfig.getSeparator() != '\u0000') {
+              writer.write(hdfsSinkConfig.getSeparator());
+           }
+           unackedRecords.put(record);
+        } catch (IOException | InterruptedException e) {
+            LOG.error("Unable to write to file " + getPath(), e);
+            record.fail();
+        }
+    }
+
+    private OutputStream openHdfsStream() throws IOException {
+       if (hdfsSinkConfig.getCompression() != null) {
+          return getCompressionCodec().createOutputStream(getHdfsStream());
+       } else {
+          return getHdfsStream();
+       }
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
new file mode 100644
index 0000000..355c6df
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+
+/**
+ * A Simple Sink class for Hdfs Text File.
+ */
+public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> implements Sink<String> {
+    @Override
+    public KeyValue<String, String> extractKeyValue(Record<String> record) {
+       String key = record.getKey().orElseGet(() -> new String(record.getValue()));
+       return new KeyValue<>(key, new String(record.getValue()));
+    }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
new file mode 100644
index 0000000..9ade570
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..f2a2b55
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: hdfs
+description: Writes data into HDFS
+sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
new file mode 100644
index 0000000..32085dd
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Simple base class for all the HDFS sink test cases.
+ * Provides utility methods for sending records to the sink.
+ *
+ */
+public abstract class AbstractHdfsSinkTest<K, V> {
+    
+    @Mock
+    protected SinkContext mockSinkContext;
+    
+    @Mock
+    protected Record<V> mockRecord;
+    
+    protected Map<String, Object> map;
+    protected HdfsAbstractSink<K, V> sink;
+    
+    @SuppressWarnings("unchecked")
+    @BeforeMethod
+    public final void setUp() throws Exception {
+        map = new HashMap<String, Object> ();
+        map.put("hdfsConfigResources", "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml,"
+                + "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml");
+        map.put("directory", "/tmp/testing");
+        map.put("filenamePrefix", "prefix");
+        
+        mockSinkContext = mock(SinkContext.class);
+        
+        mockRecord = mock(Record.class);
+        when(mockRecord.getRecordSequence()).thenAnswer(new Answer<Optional<Long>>() {
+          long sequenceCounter = 0;
+          public Optional<Long> answer(InvocationOnMock invocation) throws Throwable {
+             return Optional.of(sequenceCounter++);
+          }});
+        
+        when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+            long sequenceCounter = 0;
+            public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+               return Optional.of( "key-" + sequenceCounter++);
+            }});
+        
+        when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+            long sequenceCounter = 0;
+            public String answer(InvocationOnMock invocation) throws Throwable {
+                 return new String( "value-" + sequenceCounter++ + "-" + UUID.randomUUID().toString());
+            }});
+        
+        createSink();
+    }
+
+    protected abstract void createSink();
+
+    protected final void send(int numRecords) throws Exception {
+        for (int idx = 0; idx < numRecords; idx++) {
+            sink.write(mockRecord);
+        }
+    }
+    
+    protected final void runFor(int numSeconds) throws InterruptedException {
+        Producer producer = new Producer();
+        producer.start();
+        Thread.sleep(numSeconds * 1000); // Run for N seconds
+        producer.halt();
+        producer.join(2000);
+    }
+    
+    protected final class Producer extends Thread {
+        public boolean keepRunning = true;
+        @Override
+        public void run() {
+            while (keepRunning)
+                try {
+                    sink.write(mockRecord);
+                } catch (Exception e) {
+                    // TODO Auto-generated catch block
+                    e.printStackTrace();
+                }
+        }
+        
+        public void halt() { 
+            keepRunning = false; 
+        }
+        
+    }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
new file mode 100644
index 0000000..2f0b3f3
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pulsar.io.hdfs.Compression;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.exc.InvalidFormatException;
+
+
+public class HdfsSinkConfigTests {
+	
+	@Test
+	public final void loadFromYamlFileTest() throws IOException {
+		File yamlFile = getFile("sinkConfig.yaml");
+		HdfsSinkConfig config = HdfsSinkConfig.load(yamlFile.getAbsolutePath());
+		assertNotNull(config);
+		assertEquals("core-site.xml", config.getHdfsConfigResources());
+		assertEquals("/foo/bar", config.getDirectory());
+		assertEquals("prefix", config.getFilenamePrefix());
+		assertEquals(Compression.SNAPPY, config.getCompression());
+	}
+	
+	@Test
+	public final void loadFromMapTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("compression", "SNAPPY");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		assertNotNull(config);
+		assertEquals("core-site.xml", config.getHdfsConfigResources());
+		assertEquals("/foo/bar", config.getDirectory());
+		assertEquals("prefix", config.getFilenamePrefix());
+		assertEquals(Compression.SNAPPY, config.getCompression());
+	}
+	
+	@Test
+	public final void validValidateTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("fileExtension", ".txt");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = IllegalArgumentException.class, 
+			expectedExceptionsMessageRegExp = "Required property not set.")
+	public final void missingDirectoryValidateTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = IllegalArgumentException.class, 
+		  expectedExceptionsMessageRegExp = "Required property not set.")
+	public final void missingHdfsConfigsValidateTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("directory", "/foo/bar");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = InvalidFormatException.class)
+	public final void invalidCodecValidateTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("fileExtension", ".txt");
+		map.put("compression", "bad value");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = IllegalArgumentException.class, 
+		  expectedExceptionsMessageRegExp = "Sync Interval cannot be negative")
+	public final void invalidSyncIntervalTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("fileExtension", ".txt");
+		map.put("syncInterval", -1);
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = IllegalArgumentException.class, 
+		  expectedExceptionsMessageRegExp = "Max Pending Records must be a positive integer")
+	public final void invalidMaxPendingRecordsTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("fileExtension", ".txt");
+		map.put("maxPendingRecords", 0);
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	@Test(expectedExceptions = IllegalArgumentException.class, 
+		  expectedExceptionsMessageRegExp = "Values for both kerberosUserPrincipal & keytab are required.")
+	public final void kerberosValidateTest() throws IOException {
+		Map<String, Object> map = new HashMap<String, Object> ();
+		map.put("hdfsConfigResources", "core-site.xml");
+		map.put("directory", "/foo/bar");
+		map.put("filenamePrefix", "prefix");
+		map.put("keytab", "/etc/keytab/hdfs.client.ktab");
+		
+		HdfsSinkConfig config = HdfsSinkConfig.load(map);
+		config.validate();
+	}
+	
+	private File getFile(String name) {
+		ClassLoader classLoader = getClass().getClassLoader();
+		return new File(classLoader.getResource(name).getFile());
+	}
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
new file mode 100644
index 0000000..d54e5ec
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> {
+	
+    @Override
+    protected void createSink() {
+        sink = new HdfsSequentialTextSink();
+    }
+    
+    @Test(enabled = false)
+	public final void write100Test() throws Exception {
+		map.put("filenamePrefix", "write100Test-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
+		sink.open(map, mockSinkContext);
+		
+		assertNotNull(sink);
+		send(100);
+		
+		Thread.sleep(2000);
+		verify(mockRecord, times(100)).ack();
+		sink.close();
+	}
+	
+	@Test(enabled = false)
+	public final void write5000Test() throws Exception {
+		map.put("filenamePrefix", "write5000Test-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
+		sink.open(map, mockSinkContext);
+		
+		assertNotNull(sink);
+		send(5000);
+		
+		Thread.sleep(2000);
+		verify(mockRecord, times(5000)).ack();
+		sink.close();
+	}
+	
+	@Test(enabled = false)
+	public final void tenSecondTest() throws Exception {
+		map.put("filenamePrefix", "tenSecondTest-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
+		sink.open(map, mockSinkContext);
+		runFor(10);	
+		sink.close();
+	}
+	
+	@Test(enabled = false)
+	public final void bzip2CompressionTest() throws Exception {
+		map.put("filenamePrefix", "bzip2CompressionTest-seq");
+		map.put("compression", "BZIP2");
+		map.remove("fileExtension");
+		sink.open(map, mockSinkContext);
+		send(5000);
+		verify(mockRecord, times(5000)).ack();
+	}
+	
+	@Test(enabled = false)
+	public final void deflateCompressionTest() throws Exception {
+		map.put("filenamePrefix", "deflateCompressionTest-seq");
+		map.put("compression", "DEFLATE");
+		map.remove("fileExtension");
+		sink.open(map, mockSinkContext);
+		send(5000);
+		verify(mockRecord, times(5000)).ack();
+	}
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
new file mode 100644
index 0000000..bb720fa
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
+    
+    @Override
+    protected void createSink() {
+        sink = new HdfsTextSink();
+    }
+
+    @Test(enabled = false)
+    public final void write100Test() throws Exception {
+        map.put("filenamePrefix", "write100TestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(sink);
+        assertNotNull(mockRecord);
+        send(100);
+        
+        Thread.sleep(2000);
+        verify(mockRecord, times(100)).ack();
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void write5000Test() throws Exception {
+        map.put("filenamePrefix", "write5000TestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(sink);
+        assertNotNull(mockRecord);
+        send(5000);
+        
+        Thread.sleep(2000);
+        verify(mockRecord, times(5000)).ack();
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void tenSecondTest() throws Exception {
+        map.put("filenamePrefix", "tenSecondTestText-seq");
+        map.put("fileExtension", ".seq");
+        map.put("syncInterval", 1000);
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        
+        runFor(10); 
+        sink.close();
+    }
+    
+    @Test(enabled = false)
+    public final void bzip2CompressionTest() throws Exception {
+        map.put("filenamePrefix", "bzip2CompressionTestText-seq");
+        map.put("compression", "BZIP2");
+        map.remove("fileExtension");
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        
+        send(5000);
+        verify(mockRecord, times(5000)).ack();
+    }
+    
+    @Test(enabled = false)
+    public final void deflateCompressionTest() throws Exception {
+        map.put("filenamePrefix", "deflateCompressionTestText-seq");
+        map.put("compression", "DEFLATE");
+        map.remove("fileExtension");
+        sink.open(map, mockSinkContext);
+        
+        assertNotNull(mockRecord);
+        send(5000);
+        verify(mockRecord, times(5000)).ack();
+    }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
new file mode 100644
index 0000000..98b8a61
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+	
+    @Override
+    protected void createSink() {
+        sink = new HdfsStringSink();
+    }
+    
+    @Test(enabled = false)
+	public final void write5000Test() throws Exception {
+		map.put("filenamePrefix", "write5000Test");
+		map.put("fileExtension", ".txt");
+		map.put("separator", '\n');
+		sink.open(map, mockSinkContext);
+		send(5000);
+		sink.close();
+		verify(mockRecord, times(5000)).ack();
+	}
+	
+    @Test(enabled = false)
+	public final void fiveByTwoThousandTest() throws Exception {
+		map.put("filenamePrefix", "fiveByTwoThousandTest");
+		map.put("fileExtension", ".txt");
+		map.put("separator", '\n');
+		sink.open(map, mockSinkContext);
+		
+		for (int idx = 1; idx < 6; idx++) {
+			send(2000);
+		}
+		sink.close();
+		verify(mockRecord, times(2000 * 5)).ack();
+	}
+	
+    @Test(enabled = false)
+	public final void tenSecondTest() throws Exception {
+		map.put("filenamePrefix", "tenSecondTest");
+		map.put("fileExtension", ".txt");
+		map.put("separator", '\n');
+		sink.open(map, mockSinkContext);
+		runFor(10);	
+		sink.close();
+	}
+	
+    @Test(enabled = false)
+	public final void maxPendingRecordsTest() throws Exception {
+		map.put("filenamePrefix", "maxPendingRecordsTest");
+		map.put("fileExtension", ".txt");
+		map.put("separator", '\n');
+		map.put("maxPendingRecords", 500);
+		sink.open(map, mockSinkContext);
+		runFor(10);	
+		sink.close();
+	}
+	
+    @Test(enabled = false)
+	public final void bzip2CompressionTest() throws Exception {
+		map.put("filenamePrefix", "bzip2CompressionTest");
+		map.put("compression", "BZIP2");
+		map.remove("fileExtension");
+		map.put("separator", '\n');
+		sink.open(map, mockSinkContext);
+		send(5000);
+		sink.close();
+		verify(mockRecord, times(5000)).ack();
+	}
+	
+    @Test(enabled = false)
+	public final void deflateCompressionTest() throws Exception {
+		map.put("filenamePrefix", "deflateCompressionTest");
+		map.put("compression", "DEFLATE");
+		map.put("fileExtension", ".deflate");
+		map.put("separator", '\n');
+		sink.open(map, mockSinkContext);
+		send(50000);
+		sink.close();
+		verify(mockRecord, times(50000)).ack();
+	}
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
index 92f2186..31d1e98 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
@@ -18,29 +18,15 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
-    <version>2.2.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-io</artifactId>
-  <name>Pulsar IO :: Parent</name>
-
-  <modules>
-    <module>core</module>
-    <module>twitter</module>
-    <module>cassandra</module>
-    <module>aerospike</module>
-    <module>kafka</module>
-    <module>rabbitmq</module>
-    <module>kinesis</module>
-    <module>jdbc</module>
-    <module>data-genenator</module>
-  </modules>
-
-</project>
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>hdfs://0.0.0.0:8020</value>
+    </property>
+    <property>
+        <name>io.compression.codecs</name>
+        <value>org.apache.hadoop.io.compress.GzipCodec,
+               org.apache.hadoop.io.compress.DefaultCodec,
+               org.apache.hadoop.io.compress.SnappyCodec</value>
+   </property>
+</configuration>
diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
index 92f2186..bb722f1 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
@@ -18,29 +18,17 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-  <modelVersion>4.0.0</modelVersion>
-  <packaging>pom</packaging>
-  <parent>
-    <groupId>org.apache.pulsar</groupId>
-    <artifactId>pulsar</artifactId>
-    <version>2.2.0-incubating-SNAPSHOT</version>
-  </parent>
-
-  <artifactId>pulsar-io</artifactId>
-  <name>Pulsar IO :: Parent</name>
-
-  <modules>
-    <module>core</module>
-    <module>twitter</module>
-    <module>cassandra</module>
-    <module>aerospike</module>
-    <module>kafka</module>
-    <module>rabbitmq</module>
-    <module>kinesis</module>
-    <module>jdbc</module>
-    <module>data-genenator</module>
-  </modules>
-
-</project>
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+    </property>
+    <property>
+    	<name>dfs.client.use.datanode.hostname</name>
+    	<value>true</value>
+    </property>
+    <property>
+       <name>dfs.support.append</name>
+       <value>true</value>
+   </property>
+</configuration>
diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..5a19ee0
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"hdfsConfigResources": "core-site.xml",
+"directory": "/foo/bar",
+"filenamePrefix": "prefix",
+"compression": "SNAPPY"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 92f2186..5c03370 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -39,6 +39,7 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
+    <module>hdfs</module>
     <module>jdbc</module>
     <module>data-genenator</module>
   </modules>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
new file mode 100644
index 0000000..58f781f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class HdfsContainer extends ChaosContainer<HdfsContainer> {
+
+	public static final String NAME = "HDFS";
+	static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 50010, 50020, 50070, 50070, 50090 };
+	
+	private static final String IMAGE_NAME = "harisekhon/hadoop:latest";
+    
+	public HdfsContainer(String clusterName) {
+		super(clusterName, IMAGE_NAME);
+	}
+	
+	@Override
+    public String getContainerName() {
+        return clusterName;
+    }
+	
+	@Override
+    protected void configure() {
+		super.configure();
+		this.withNetworkAliases(NAME)
+        .withExposedPorts(PORTS)
+        .withCreateContainerCmdModifier(createContainerCmd -> {
+            createContainerCmd.withHostName(NAME);
+            createContainerCmd.withName(clusterName + "-" + NAME);
+        })
+        .waitingFor(new HostPortWaitStrategy());
+	}
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 7b3cd4f..17634a1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -72,7 +72,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     public void testCassandraArchiveSink() throws Exception {
         testSink(new CassandraSinkArchiveTester(), false);
     }
-
+    
+    @Test(enabled = false)
+    public void testHdfsSink() throws Exception {
+        testSink(new HdfsSinkTester(), false);
+    }
+    
     @Test
     public void testJdbcSink() throws Exception {
         testSink(new JdbcSinkTester(), true);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
new file mode 100644
index 0000000..46c5f24
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
+import org.testcontainers.containers.GenericContainer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class HdfsSinkTester extends SinkTester {
+	
+	private static final String NAME = "HDFS";
+	
+	private HdfsContainer hdfsCluster;
+	
+	public HdfsSinkTester() {
+		super(SinkType.HDFS);
+		
+		// TODO How do I get the core-site.xml, and hdfs-site.xml files from the container?
+		sinkConfig.put("hdfsConfigResources", "");
+		sinkConfig.put("directory", "/testing/test");
+	}
+
+	@Override
+	public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
+		GenericContainer<?> container = containers.get(NAME);	
+		checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster");
+	    this.hdfsCluster = (HdfsContainer) container;
+	}
+
+	@Override
+	public void prepareSink() throws Exception {
+		// Create the test directory
+		hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing");
+		hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing");
+		
+		// Execute all future commands as the "tester" user
+		hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+	}
+
+	@Override
+	public void validateSinkResult(Map<String, String> kvs) {
+		// TODO Auto-generated method stub
+
+	}
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 098b8bf..7f4b2d9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -33,7 +33,8 @@ public abstract class SinkTester {
         UNDEFINED,
         CASSANDRA,
         KAFKA,
-        JDBC
+        JDBC,
+        HDFS
     }
 
     protected final SinkType sinkType;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 147f273..438c96e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.suites;
 
 import java.util.Map;
 import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
 import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
 import org.testcontainers.containers.GenericContainer;