You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/12/16 01:37:39 UTC

[pulsar] branch master updated: Broke Pulsar-IO::HDFS into 3.x and 2.x versions (#2966)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ccf7ff  Broke Pulsar-IO::HDFS into 3.x and 2.x versions (#2966)
2ccf7ff is described below

commit 2ccf7ff2a6c16f1b16342aa4529d5197195bad9d
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Sat Dec 15 17:37:35 2018 -0800

    Broke Pulsar-IO::HDFS into 3.x and 2.x versions (#2966)
    
    ### Motivation
    
    A user was attempting to use the existing HDFS connector to connect to a 2.x version of HDFS, but the current connector only supported 3.x version of HDFS.
    
    ### Modifications
    
    To address this issue, we renamed the current HDFS connector to HDFS3, and created a new 2.x compatible connector named HDFS2.
    
    The code in both of these are nearly identical with 2 notable exceptions. First and foremost, they both use different versions of the Hadoop-client library. And secondly, the HDFS2 version creates the FSDataOutputStream object directly, whereas the HDFS3 version leverages the FSDataOutputStreamBuilder class for this purpose, as it is the preferred method going forward.
    
    ### Result
    
    There will be be support for connecting to both 2.x and 3.x version of HDFS. However, there MAY BE some library conflicts in the released jar due to the different versions of the same library in the different modules. Hopefully the NAR packaging will address this.
---
 pulsar-io/{hdfs => hdfs2}/pom.xml                  |  24 ++---
 .../pulsar/io/hdfs2}/AbstractHdfsConfig.java       |   2 +-
 .../pulsar/io/hdfs2}/AbstractHdfsConnector.java    |   4 +-
 .../org/apache/pulsar/io/hdfs2}/Compression.java   |   2 +-
 .../org/apache/pulsar/io/hdfs2}/HdfsResources.java |   2 +-
 .../org/apache/pulsar/io/hdfs2}/SecurityUtil.java  |   2 +-
 .../org/apache/pulsar/io/hdfs2}/package-info.java  |   2 +-
 .../pulsar/io/hdfs2}/sink/HdfsAbstractSink.java    |  21 ++---
 .../pulsar/io/hdfs2}/sink/HdfsSinkConfig.java      |   4 +-
 .../pulsar/io/hdfs2}/sink/HdfsSyncThread.java      |   2 +-
 .../apache/pulsar/io/hdfs2/sink}/package-info.java |   2 +-
 .../sink/seq/HdfsAbstractSequenceFileSink.java     |   4 +-
 .../io/hdfs2}/sink/seq/HdfsSequentialTextSink.java |   2 +-
 .../pulsar/io/hdfs2}/sink/seq/HdfsTextSink.java    |   2 +-
 .../pulsar/io/hdfs2/sink/seq}/package-info.java    |   2 +-
 .../hdfs2}/sink/text/HdfsAbstractTextFileSink.java |   4 +-
 .../pulsar/io/hdfs2}/sink/text/HdfsStringSink.java |   2 +-
 .../pulsar/io/hdfs2/sink/text}/package-info.java   |   2 +-
 .../resources/META-INF/services/pulsar-io.yaml     |   6 +-
 .../io/hdfs2}/sink/AbstractHdfsSinkTest.java       |   7 +-
 .../pulsar/io/hdfs2}/sink/HdfsSinkConfigTests.java |   5 +-
 .../hdfs2}/sink/seq/HdfsSequentialSinkTests.java   |   5 +-
 .../io/hdfs2}/sink/seq/HdfsTextSinkTests.java      |   5 +-
 .../io/hdfs2/sink/text/HdfsStringSinkTests.java}   |  98 ++++++++++----------
 .../src/test/resources/hadoop/core-site.xml        |   0
 .../src/test/resources/hadoop/hdfs-site.xml        |   0
 .../src/test/resources/sinkConfig.yaml             |   0
 pulsar-io/{hdfs => hdfs3}/pom.xml                  |   4 +-
 .../pulsar/io/hdfs3}/AbstractHdfsConfig.java       |   2 +-
 .../pulsar/io/hdfs3}/AbstractHdfsConnector.java    |   4 +-
 .../org/apache/pulsar/io/hdfs3}/Compression.java   |   2 +-
 .../org/apache/pulsar/io/hdfs3}/HdfsResources.java |   2 +-
 .../org/apache/pulsar/io/hdfs3}/SecurityUtil.java  |   2 +-
 .../org/apache/pulsar/io/hdfs3}/package-info.java  |   2 +-
 .../pulsar/io/hdfs3}/sink/HdfsAbstractSink.java    |   6 +-
 .../pulsar/io/hdfs3}/sink/HdfsSinkConfig.java      |   4 +-
 .../pulsar/io/hdfs3}/sink/HdfsSyncThread.java      |   2 +-
 .../apache/pulsar/io/hdfs3/sink}/package-info.java |   2 +-
 .../sink/seq/HdfsAbstractSequenceFileSink.java     |   4 +-
 .../io/hdfs3}/sink/seq/HdfsSequentialTextSink.java |   2 +-
 .../pulsar/io/hdfs3}/sink/seq/HdfsTextSink.java    |   2 +-
 .../pulsar/io/hdfs3}/sink/seq/package-info.java    |   2 +-
 .../hdfs3}/sink/text/HdfsAbstractTextFileSink.java |   4 +-
 .../pulsar/io/hdfs3}/sink/text/HdfsStringSink.java |   2 +-
 .../pulsar/io/hdfs3/sink/text}/package-info.java   |   2 +-
 .../resources/META-INF/services/pulsar-io.yaml     |   6 +-
 .../io/hdfs3}/sink/AbstractHdfsSinkTest.java       |   3 +-
 .../pulsar/io/hdfs3}/sink/HdfsSinkConfigTests.java |   5 +-
 .../hdfs3/sink/seq/HdfsSequentialSinkTests.java}   |  82 ++++++++---------
 .../io/hdfs3}/sink/seq/HdfsTextSinkTests.java      |   5 +-
 .../io/hdfs3/sink/text/HdfsStringSinkTests.java}   | 100 ++++++++++-----------
 .../src/test/resources/hadoop/core-site.xml        |   0
 .../src/test/resources/hadoop/hdfs-site.xml        |   0
 .../src/test/resources/sinkConfig.yaml             |   0
 pulsar-io/pom.xml                                  |   3 +-
 55 files changed, 230 insertions(+), 235 deletions(-)

diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs2/pom.xml
similarity index 87%
copy from pulsar-io/hdfs/pom.xml
copy to pulsar-io/hdfs2/pom.xml
index 57f04ee..49859a7 100644
--- a/pulsar-io/hdfs/pom.xml
+++ b/pulsar-io/hdfs2/pom.xml
@@ -25,10 +25,10 @@
     <artifactId>pulsar-io</artifactId>
     <version>2.3.0-SNAPSHOT</version>
   </parent>
-  <artifactId>pulsar-io-hdfs</artifactId>
-  <name>Pulsar IO :: Hdfs</name>
+  <artifactId>pulsar-io-hdfs2</artifactId>
+  <name>Pulsar IO :: Hdfs2</name>
   
-  <dependencies>
+    <dependencies>
      <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-core</artifactId>
@@ -46,17 +46,21 @@
     </dependency>
     
   	<dependency>
-  		<groupId>org.apache.hadoop</groupId>
-  		<artifactId>hadoop-client</artifactId>
-  		<version>3.1.1</version>
-  	</dependency>
-
-  	<dependency>
   		<groupId>org.testng</groupId>
   		<artifactId>testng</artifactId>
   		<scope>test</scope>
   	</dependency>
-  </dependencies>
+  	<dependency>
+  		<groupId>org.apache.hadoop</groupId>
+  		<artifactId>hadoop-client</artifactId>
+  		<version>2.8.2</version>
+  	</dependency>
+    <dependency>
+       <groupId>org.apache.commons</groupId>
+  	   <artifactId>commons-lang3</artifactId>
+  	   <version>3.4</version>
+    </dependency> 	
+ </dependencies>
   
   <build>
     <plugins>
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
similarity index 98%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
index 529c350..d7bdef1 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs2;
 
 import java.io.Serializable;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
similarity index 99%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
index 0eccd93..163fe22 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/AbstractHdfsConnector.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs2;
 
 import java.io.IOException;
 import java.lang.ref.WeakReference;
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
+import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
 
 /**
  * A Simple abstract class for HDFS connectors.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
similarity index 96%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
index 97dba53..e698b32 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/Compression.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs2;
 
 /**
  * An enumeration of compression codecs available for HDFS.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
similarity index 97%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
index 1d04c6c..e7575c0 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/HdfsResources.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs2;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
similarity index 99%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
index c5462d3..8c5c693 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/SecurityUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs2;
 
 import java.io.IOException;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
similarity index 95%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
index 4294852..ece38d0 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs2;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
similarity index 82%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
index 18184e2..dbc5881 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsAbstractSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs2.sink;
 
 import java.io.IOException;
 import java.util.Map;
@@ -26,17 +26,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs.AbstractHdfsConnector;
-import org.apache.pulsar.io.hdfs.HdfsResources;
+import org.apache.pulsar.io.hdfs2.AbstractHdfsConnector;
+import org.apache.pulsar.io.hdfs2.HdfsResources;
 
 /**
  * A Simple abstract class for HDFS sink.
@@ -84,17 +81,11 @@ public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector imple
        }
     }
 
-    @SuppressWarnings("rawtypes")
-    protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException {
-        Path path = getPath();
-        FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
-        FSDataOutputStreamBuilder builder = fs.exists(path) ? fs.appendFile(path) :  fs.createFile(path);
-        return builder.recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    }
-
     protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
         if (hdfsStream == null) {
-            hdfsStream = getOutputStreamBuilder().build();
+            Path path = getPath();
+            FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
+            hdfsStream = fs.exists(path) ? fs.append(path) : fs.create(path);
         }
         return hdfsStream;
     }
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
similarity index 97%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
index e9f4cae..aafb63f 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs2.sink;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -34,7 +34,7 @@ import lombok.ToString;
 import lombok.experimental.Accessors;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
+import org.apache.pulsar.io.hdfs2.AbstractHdfsConfig;
 
 /**
  * Configuration object for all HDFS Sink components.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
similarity index 98%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
index 3c19ed8..135c59b 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/HdfsSyncThread.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs2.sink;
 
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
similarity index 94%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
index 025311a..650ccb7 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs2.sink;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
similarity index 96%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
index 7c61c20..9bdc003 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsAbstractSequenceFileSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.seq;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.SequenceFile.Writer.Option;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
similarity index 98%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
index 84ce09f..67ad414 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialTextSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.seq;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
similarity index 97%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
index 84ebc07..8718389 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.seq;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
similarity index 94%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
rename to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
index 9ade570..31eba30 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/seq/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs2.sink.seq;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
similarity index 96%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
index 6fb50a5..df35a7a 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsAbstractTextFileSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
+package org.apache.pulsar.io.hdfs2.sink.text;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
@@ -26,7 +26,7 @@ import java.io.OutputStreamWriter;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
similarity index 96%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
index 355c6df..54b8d34 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
+package org.apache.pulsar.io.hdfs2.sink.text;
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
similarity index 94%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
copy to pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
index 025311a..34b76d3 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
+++ b/pulsar-io/hdfs2/src/main/java/org/apache/pulsar/io/hdfs2/sink/text/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs2.sink.text;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
similarity index 87%
copy from pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
copy to pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
index f2a2b55..92dd642 100644
--- a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs2/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: hdfs
-description: Writes data into HDFS
-sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
+name: hdfs2
+description: Writes data into HDFS 2.x
+sinkClass: org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
similarity index 94%
copy from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
copy to pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
index 5d40138..6f630be 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/AbstractHdfsSinkTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs2.sink;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -28,6 +28,7 @@ import java.util.UUID;
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hdfs2.sink.HdfsAbstractSink;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -53,8 +54,8 @@ public abstract class AbstractHdfsSinkTest<K, V> {
     @BeforeMethod
     public final void setUp() throws Exception {
         map = new HashMap<String, Object> ();
-        map.put("hdfsConfigResources", "../pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml,"
-                + "../pulsar/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml");
+        map.put("hdfsConfigResources", "../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml,"
+                + "../pulsar/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml");
         map.put("directory", "/tmp/testing");
         map.put("filenamePrefix", "prefix");
         
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
similarity index 97%
copy from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
copy to pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
index 2f0b3f3..d4e1f03 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/HdfsSinkConfigTests.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs2.sink;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -26,7 +26,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.pulsar.io.hdfs.Compression;
+import org.apache.pulsar.io.hdfs2.Compression;
+import org.apache.pulsar.io.hdfs2.sink.HdfsSinkConfig;
 import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.databind.exc.InvalidFormatException;
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
similarity index 94%
rename from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
rename to pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
index d54e5ec..60f2ba5 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsSequentialSinkTests.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.seq;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.seq.HdfsSequentialTextSink;
 import org.testng.annotations.Test;
 
 public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> {
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
similarity index 95%
copy from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
copy to pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
index bb720fa..c0a051d 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/seq/HdfsTextSinkTests.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.seq;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.seq.HdfsTextSink;
 import org.testng.annotations.Test;
 
 public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
similarity index 54%
copy from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
copy to pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
index bb720fa..3e9f403 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
+++ b/pulsar-io/hdfs2/src/test/java/org/apache/pulsar/io/hdfs2/sink/text/HdfsStringSinkTests.java
@@ -16,89 +16,89 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs2.sink.text;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs2.sink.text.HdfsStringSink;
 import org.testng.annotations.Test;
 
-public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
-    
+public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+	
     @Override
     protected void createSink() {
-        sink = new HdfsTextSink();
+        sink = new HdfsStringSink();
     }
-
+    
     @Test(enabled = false)
-    public final void write100Test() throws Exception {
-        map.put("filenamePrefix", "write100TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
+    public final void write5000Test() throws Exception {
+        map.put("filenamePrefix", "write5000Test");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
-        send(100);
-        
-        Thread.sleep(2000);
-        verify(mockRecord, times(100)).ack();
+        send(5000);
         sink.close();
+        verify(mockRecord, times(5000)).ack();
     }
-    
+
     @Test(enabled = false)
-    public final void write5000Test() throws Exception {
-        map.put("filenamePrefix", "write5000TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
+    public final void fiveByTwoThousandTest() throws Exception {
+        map.put("filenamePrefix", "fiveByTwoThousandTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
         
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
-        send(5000);
-        
-        Thread.sleep(2000);
-        verify(mockRecord, times(5000)).ack();
+        for (int idx = 1; idx < 6; idx++) {
+        	send(2000);
+        }
         sink.close();
+        verify(mockRecord, times(2000 * 5)).ack();
     }
-    
+
     @Test(enabled = false)
     public final void tenSecondTest() throws Exception {
-        map.put("filenamePrefix", "tenSecondTestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
+        map.put("filenamePrefix", "tenSecondTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        
-        runFor(10); 
+        runFor(10);	
         sink.close();
     }
-    
+
+    @Test(enabled = false)
+    public final void maxPendingRecordsTest() throws Exception {
+        map.put("filenamePrefix", "maxPendingRecordsTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
+        map.put("maxPendingRecords", 500);
+        sink.open(map, mockSinkContext);
+        runFor(10);	
+        sink.close();
+    }
+
     @Test(enabled = false)
     public final void bzip2CompressionTest() throws Exception {
-        map.put("filenamePrefix", "bzip2CompressionTestText-seq");
+        map.put("filenamePrefix", "bzip2CompressionTest");
         map.put("compression", "BZIP2");
         map.remove("fileExtension");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        
         send(5000);
+        sink.close();
         verify(mockRecord, times(5000)).ack();
     }
-    
+
     @Test(enabled = false)
     public final void deflateCompressionTest() throws Exception {
-        map.put("filenamePrefix", "deflateCompressionTestText-seq");
+        map.put("filenamePrefix", "deflateCompressionTest");
         map.put("compression", "DEFLATE");
-        map.remove("fileExtension");
+        map.put("fileExtension", ".deflate");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        send(5000);
-        verify(mockRecord, times(5000)).ack();
+        send(50000);
+        sink.close();
+        verify(mockRecord, times(50000)).ack();
     }
 }
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml b/pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
similarity index 100%
copy from pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
copy to pulsar-io/hdfs2/src/test/resources/hadoop/core-site.xml
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml b/pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
similarity index 100%
copy from pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
copy to pulsar-io/hdfs2/src/test/resources/hadoop/hdfs-site.xml
diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
similarity index 100%
copy from pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
copy to pulsar-io/hdfs2/src/test/resources/sinkConfig.yaml
diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs3/pom.xml
similarity index 96%
rename from pulsar-io/hdfs/pom.xml
rename to pulsar-io/hdfs3/pom.xml
index 57f04ee..a59b597 100644
--- a/pulsar-io/hdfs/pom.xml
+++ b/pulsar-io/hdfs3/pom.xml
@@ -25,8 +25,8 @@
     <artifactId>pulsar-io</artifactId>
     <version>2.3.0-SNAPSHOT</version>
   </parent>
-  <artifactId>pulsar-io-hdfs</artifactId>
-  <name>Pulsar IO :: Hdfs</name>
+  <artifactId>pulsar-io-hdfs3</artifactId>
+  <name>Pulsar IO :: Hdfs3</name>
   
   <dependencies>
      <dependency>
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConfig.java
similarity index 98%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConfig.java
index 529c350..19505a4 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs3;
 
 import java.io.Serializable;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
similarity index 99%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
index 0eccd93..aa22b42 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/AbstractHdfsConnector.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs3;
 
 import java.io.IOException;
 import java.lang.ref.WeakReference;
@@ -42,7 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
+import org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig;
 
 /**
  * A Simple abstract class for HDFS connectors.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/Compression.java
similarity index 96%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/Compression.java
index 97dba53..81a96b0 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/Compression.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs3;
 
 /**
  * An enumeration of compression codecs available for HDFS.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/HdfsResources.java
similarity index 97%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/HdfsResources.java
index 1d04c6c..29ce81a 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/HdfsResources.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs3;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/SecurityUtil.java
similarity index 99%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/SecurityUtil.java
index c5462d3..ebc8d53 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/SecurityUtil.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
+package org.apache.pulsar.io.hdfs3;
 
 import java.io.IOException;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/package-info.java
similarity index 95%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/package-info.java
index c6506d9..86fa537 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs3;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsAbstractSink.java
similarity index 96%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsAbstractSink.java
index 18184e2..642c07d 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsAbstractSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs3.sink;
 
 import java.io.IOException;
 import java.util.Map;
@@ -35,8 +35,8 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.hdfs.AbstractHdfsConnector;
-import org.apache.pulsar.io.hdfs.HdfsResources;
+import org.apache.pulsar.io.hdfs3.AbstractHdfsConnector;
+import org.apache.pulsar.io.hdfs3.HdfsResources;
 
 /**
  * A Simple abstract class for HDFS sink.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfig.java
similarity index 97%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfig.java
index e9f4cae..1585d08 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs3.sink;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -34,7 +34,7 @@ import lombok.ToString;
 import lombok.experimental.Accessors;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
+import org.apache.pulsar.io.hdfs3.AbstractHdfsConfig;
 
 /**
  * Configuration object for all HDFS Sink components.
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSyncThread.java
similarity index 98%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSyncThread.java
index 3c19ed8..f585558 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/HdfsSyncThread.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs3.sink;
 
 import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/package-info.java
similarity index 95%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/package-info.java
index 4294852..c419cbd 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs3.sink;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsAbstractSequenceFileSink.java
similarity index 96%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsAbstractSequenceFileSink.java
index 7c61c20..6bf0e1c 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsAbstractSequenceFileSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs3.sink.seq;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.SequenceFile.Writer.Option;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
similarity index 98%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
index 84ce09f..d071556 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialTextSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs3.sink.seq;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSink.java
similarity index 97%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSink.java
index 84ebc07..44fd979 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs3.sink.seq;
 
 import java.io.IOException;
 import java.util.List;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/package-info.java
similarity index 94%
copy from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
copy to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/package-info.java
index 025311a..2a30ad3 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/seq/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs3.sink.seq;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsAbstractTextFileSink.java
similarity index 96%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsAbstractTextFileSink.java
index 6fb50a5..2485ed4 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsAbstractTextFileSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
+package org.apache.pulsar.io.hdfs3.sink.text;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
@@ -26,7 +26,7 @@ import java.io.OutputStreamWriter;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSink.java
similarity index 96%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSink.java
index 355c6df..47316c6 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSink.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
+package org.apache.pulsar.io.hdfs3.sink.text;
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.KeyValue;
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/package-info.java
similarity index 94%
rename from pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
rename to pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/package-info.java
index 025311a..7d7f1ab 100644
--- a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
+++ b/pulsar-io/hdfs3/src/main/java/org/apache/pulsar/io/hdfs3/sink/text/package-info.java
@@ -16,4 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
+package org.apache.pulsar.io.hdfs3.sink.text;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
similarity index 87%
rename from pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
rename to pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
index f2a2b55..bb672b3 100644
--- a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/hdfs3/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,6 +17,6 @@
 # under the License.
 #
 
-name: hdfs
-description: Writes data into HDFS
-sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
+name: hdfs3
+description: Writes data into HDFS 3.x
+sinkClass: org.apache.pulsar.io.hdfs3.sink.text.HdfsStringSink
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
similarity index 97%
rename from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
rename to pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
index 5d40138..c8d343e 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/AbstractHdfsSinkTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs3.sink;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -28,6 +28,7 @@ import java.util.UUID;
 
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hdfs3.sink.HdfsAbstractSink;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfigTests.java
similarity index 97%
rename from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
rename to pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfigTests.java
index 2f0b3f3..2706795 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/HdfsSinkConfigTests.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink;
+package org.apache.pulsar.io.hdfs3.sink;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
@@ -26,7 +26,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.pulsar.io.hdfs.Compression;
+import org.apache.pulsar.io.hdfs3.Compression;
+import org.apache.pulsar.io.hdfs3.sink.HdfsSinkConfig;
 import org.testng.annotations.Test;
 
 import com.fasterxml.jackson.databind.exc.InvalidFormatException;
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialSinkTests.java
similarity index 54%
rename from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
rename to pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialSinkTests.java
index 98b8a61..7c89f57 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsSequentialSinkTests.java
@@ -16,88 +16,80 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.text;
+package org.apache.pulsar.io.hdfs3.sink.seq;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.seq.HdfsSequentialTextSink;
 import org.testng.annotations.Test;
 
-public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> {
 	
     @Override
     protected void createSink() {
-        sink = new HdfsStringSink();
+        sink = new HdfsSequentialTextSink();
     }
     
     @Test(enabled = false)
-	public final void write5000Test() throws Exception {
-		map.put("filenamePrefix", "write5000Test");
-		map.put("fileExtension", ".txt");
-		map.put("separator", '\n');
+	public final void write100Test() throws Exception {
+		map.put("filenamePrefix", "write100Test-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
 		sink.open(map, mockSinkContext);
-		send(5000);
+		
+		assertNotNull(sink);
+		send(100);
+		
+		Thread.sleep(2000);
+		verify(mockRecord, times(100)).ack();
 		sink.close();
-		verify(mockRecord, times(5000)).ack();
 	}
 	
-    @Test(enabled = false)
-	public final void fiveByTwoThousandTest() throws Exception {
-		map.put("filenamePrefix", "fiveByTwoThousandTest");
-		map.put("fileExtension", ".txt");
-		map.put("separator", '\n');
+	@Test(enabled = false)
+	public final void write5000Test() throws Exception {
+		map.put("filenamePrefix", "write5000Test-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
 		sink.open(map, mockSinkContext);
 		
-		for (int idx = 1; idx < 6; idx++) {
-			send(2000);
-		}
+		assertNotNull(sink);
+		send(5000);
+		
+		Thread.sleep(2000);
+		verify(mockRecord, times(5000)).ack();
 		sink.close();
-		verify(mockRecord, times(2000 * 5)).ack();
 	}
 	
-    @Test(enabled = false)
+	@Test(enabled = false)
 	public final void tenSecondTest() throws Exception {
-		map.put("filenamePrefix", "tenSecondTest");
-		map.put("fileExtension", ".txt");
-		map.put("separator", '\n');
-		sink.open(map, mockSinkContext);
-		runFor(10);	
-		sink.close();
-	}
-	
-    @Test(enabled = false)
-	public final void maxPendingRecordsTest() throws Exception {
-		map.put("filenamePrefix", "maxPendingRecordsTest");
-		map.put("fileExtension", ".txt");
-		map.put("separator", '\n');
-		map.put("maxPendingRecords", 500);
+		map.put("filenamePrefix", "tenSecondTest-seq");
+		map.put("fileExtension", ".seq");
+		map.put("syncInterval", 1000);
 		sink.open(map, mockSinkContext);
 		runFor(10);	
 		sink.close();
 	}
 	
-    @Test(enabled = false)
+	@Test(enabled = false)
 	public final void bzip2CompressionTest() throws Exception {
-		map.put("filenamePrefix", "bzip2CompressionTest");
+		map.put("filenamePrefix", "bzip2CompressionTest-seq");
 		map.put("compression", "BZIP2");
 		map.remove("fileExtension");
-		map.put("separator", '\n');
 		sink.open(map, mockSinkContext);
 		send(5000);
-		sink.close();
 		verify(mockRecord, times(5000)).ack();
 	}
 	
-    @Test(enabled = false)
+	@Test(enabled = false)
 	public final void deflateCompressionTest() throws Exception {
-		map.put("filenamePrefix", "deflateCompressionTest");
+		map.put("filenamePrefix", "deflateCompressionTest-seq");
 		map.put("compression", "DEFLATE");
-		map.put("fileExtension", ".deflate");
-		map.put("separator", '\n');
+		map.remove("fileExtension");
 		sink.open(map, mockSinkContext);
-		send(50000);
-		sink.close();
-		verify(mockRecord, times(50000)).ack();
+		send(5000);
+		verify(mockRecord, times(5000)).ack();
 	}
 }
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSinkTests.java
similarity index 95%
copy from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
copy to pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSinkTests.java
index bb720fa..a8c4238 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/seq/HdfsTextSinkTests.java
@@ -16,13 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs3.sink.seq;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.seq.HdfsTextSink;
 import org.testng.annotations.Test;
 
 public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSinkTests.java
similarity index 54%
rename from pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
rename to pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSinkTests.java
index bb720fa..a63974e 100644
--- a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
+++ b/pulsar-io/hdfs3/src/test/java/org/apache/pulsar/io/hdfs3/sink/text/HdfsStringSinkTests.java
@@ -16,89 +16,89 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.hdfs.sink.seq;
+package org.apache.pulsar.io.hdfs3.sink.text;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.testng.Assert.assertNotNull;
 
-import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.AbstractHdfsSinkTest;
+import org.apache.pulsar.io.hdfs3.sink.text.HdfsStringSink;
 import org.testng.annotations.Test;
 
-public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
-    
+public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+	
     @Override
     protected void createSink() {
-        sink = new HdfsTextSink();
-    }
-
-    @Test(enabled = false)
-    public final void write100Test() throws Exception {
-        map.put("filenamePrefix", "write100TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
-        sink.open(map, mockSinkContext);
-        
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
-        send(100);
-        
-        Thread.sleep(2000);
-        verify(mockRecord, times(100)).ack();
-        sink.close();
+        sink = new HdfsStringSink();
     }
     
     @Test(enabled = false)
     public final void write5000Test() throws Exception {
-        map.put("filenamePrefix", "write5000TestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
+        map.put("filenamePrefix", "write5000Test");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(sink);
-        assertNotNull(mockRecord);
         send(5000);
-        
-        Thread.sleep(2000);
+        sink.close();
         verify(mockRecord, times(5000)).ack();
+	}
+	
+    @Test(enabled = false)
+    public final void fiveByTwoThousandTest() throws Exception {
+        map.put("filenamePrefix", "fiveByTwoThousandTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
+        sink.open(map, mockSinkContext);
+
+        for (int idx = 1; idx < 6; idx++) {
+           send(2000);
+        }
         sink.close();
+        verify(mockRecord, times(2000 * 5)).ack();
     }
-    
+	
     @Test(enabled = false)
     public final void tenSecondTest() throws Exception {
-        map.put("filenamePrefix", "tenSecondTestText-seq");
-        map.put("fileExtension", ".seq");
-        map.put("syncInterval", 1000);
+        map.put("filenamePrefix", "tenSecondTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        
-        runFor(10); 
+        runFor(10);	
         sink.close();
     }
-    
+
+    @Test(enabled = false)
+    public final void maxPendingRecordsTest() throws Exception {
+        map.put("filenamePrefix", "maxPendingRecordsTest");
+        map.put("fileExtension", ".txt");
+        map.put("separator", '\n');
+        map.put("maxPendingRecords", 500);
+        sink.open(map, mockSinkContext);
+        runFor(10);	
+        sink.close();
+    }
+
     @Test(enabled = false)
     public final void bzip2CompressionTest() throws Exception {
-        map.put("filenamePrefix", "bzip2CompressionTestText-seq");
+        map.put("filenamePrefix", "bzip2CompressionTest");
         map.put("compression", "BZIP2");
         map.remove("fileExtension");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        
         send(5000);
+        sink.close();
         verify(mockRecord, times(5000)).ack();
     }
-    
+
     @Test(enabled = false)
     public final void deflateCompressionTest() throws Exception {
-        map.put("filenamePrefix", "deflateCompressionTestText-seq");
+        map.put("filenamePrefix", "deflateCompressionTest");
         map.put("compression", "DEFLATE");
-        map.remove("fileExtension");
+        map.put("fileExtension", ".deflate");
+        map.put("separator", '\n');
         sink.open(map, mockSinkContext);
-        
-        assertNotNull(mockRecord);
-        send(5000);
-        verify(mockRecord, times(5000)).ack();
+        send(50000);
+        sink.close();
+        verify(mockRecord, times(50000)).ack();
     }
 }
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml b/pulsar-io/hdfs3/src/test/resources/hadoop/core-site.xml
similarity index 100%
rename from pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
rename to pulsar-io/hdfs3/src/test/resources/hadoop/core-site.xml
diff --git a/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml b/pulsar-io/hdfs3/src/test/resources/hadoop/hdfs-site.xml
similarity index 100%
rename from pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
rename to pulsar-io/hdfs3/src/test/resources/hadoop/hdfs-site.xml
diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs3/src/test/resources/sinkConfig.yaml
similarity index 100%
rename from pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
rename to pulsar-io/hdfs3/src/test/resources/sinkConfig.yaml
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index a9ff107..6503b66 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -40,12 +40,13 @@
     <module>kafka</module>
     <module>rabbitmq</module>
     <module>kinesis</module>
-    <module>hdfs</module>
+    <module>hdfs3</module>
     <module>jdbc</module>
     <module>data-genenator</module>
     <module>elastic-search</module>
     <module>kafka-connect-adaptor</module>
     <module>debezium</module>
+    <module>hdfs2</module>
     <module>canal</module>
     <module>netty</module>
   </modules>