You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/09/06 18:06:35 UTC
[incubator-pulsar] branch master updated: Added HDFS Sink (#2409)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 261eab1 Added HDFS Sink (#2409)
261eab1 is described below
commit 261eab12789c8c06b64cb7a5688ab0c867d609b1
Author: David Kjerrumgaard <35...@users.noreply.github.com>
AuthorDate: Thu Sep 6 11:06:33 2018 -0700
Added HDFS Sink (#2409)
* Added HDFS Sink
* Fixed issues identified during PR review
* Fixed comment
* Added HDFS Container to externalServices
* Ignoring HdfsSink test for now
* Removed HDFS Container to externalServices
* Fixed ASL licensing
* Fixed compile errors
* Added HDFS to SinkType Enum
---
pom.xml | 50 +++-
pulsar-io/hdfs/pom.xml | 69 ++++++
.../apache/pulsar/io/hdfs/AbstractHdfsConfig.java | 86 +++++++
.../pulsar/io/hdfs/AbstractHdfsConnector.java | 258 +++++++++++++++++++++
.../org/apache/pulsar/io/hdfs/Compression.java | 26 +++
.../org/apache/pulsar/io/hdfs/HdfsResources.java | 51 ++++
.../org/apache/pulsar/io/hdfs/SecurityUtil.java | 91 ++++++++
.../org/apache/pulsar/io/hdfs/package-info.java | 19 ++
.../pulsar/io/hdfs/sink/HdfsAbstractSink.java | 121 ++++++++++
.../apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java | 108 +++++++++
.../apache/pulsar/io/hdfs/sink/HdfsSyncThread.java | 80 +++++++
.../apache/pulsar/io/hdfs/sink/package-info.java | 19 ++
.../sink/seq/HdfsAbstractSequenceFileSink.java | 96 ++++++++
.../io/hdfs/sink/seq/HdfsSequentialTextSink.java | 71 ++++++
.../pulsar/io/hdfs/sink/seq/HdfsTextSink.java | 54 +++++
.../pulsar/io/hdfs/sink/seq/package-info.java | 19 ++
.../hdfs/sink/text/HdfsAbstractTextFileSink.java | 79 +++++++
.../pulsar/io/hdfs/sink/text/HdfsStringSink.java | 34 +++
.../pulsar/io/hdfs/sink/text/package-info.java | 19 ++
.../resources/META-INF/services/pulsar-io.yaml | 22 ++
.../pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java | 119 ++++++++++
.../pulsar/io/hdfs/sink/HdfsSinkConfigTests.java | 154 ++++++++++++
.../io/hdfs/sink/seq/HdfsSequentialSinkTests.java | 94 ++++++++
.../pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java | 104 +++++++++
.../io/hdfs/sink/text/HdfsStringSinkTests.java | 103 ++++++++
.../src/test/resources/hadoop/core-site.xml} | 38 +--
.../src/test/resources/hadoop/hdfs-site.xml} | 40 ++--
pulsar-io/hdfs/src/test/resources/sinkConfig.yaml | 25 ++
pulsar-io/pom.xml | 1 +
.../integration/containers/HdfsContainer.java | 51 ++++
.../integration/functions/PulsarFunctionsTest.java | 7 +-
.../tests/integration/io/HdfsSinkTester.java | 65 ++++++
.../pulsar/tests/integration/io/SinkTester.java | 3 +-
.../tests/integration/suites/PulsarTestSuite.java | 1 +
34 files changed, 2117 insertions(+), 60 deletions(-)
diff --git a/pom.xml b/pom.xml
index 2352d5e..b349925 100644
--- a/pom.xml
+++ b/pom.xml
@@ -496,12 +496,6 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.5</version>
- </dependency>
-
- <dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
@@ -1350,6 +1344,50 @@ flexible messaging model and an intuitive client API.</description>
<profile>
<id>docker</id>
</profile>
+
+ <profile>
+ <!-- Checks style and licensing requirements. This is a good
+ idea to run for contributions and for the release process. While it would
+ be nice to run always these plugins can considerably slow the build and have
+ proven to create unstable builds in our multi-module project and when building
+ using multiple threads. The stability issues seen with Checkstyle in multi-module
+ builds include false-positives and false negatives. -->
+ <id>contrib-check</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.rat</groupId>
+ <artifactId>apache-rat-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ <phase>verify</phase>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>check-style</id>
+ <phase>verify</phase>
+ <configuration>
+ <configLocation>./buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
+ <suppressionsLocation>/buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
<repositories>
diff --git a/pulsar-io/hdfs/pom.xml b/pulsar-io/hdfs/pom.xml
new file mode 100644
index 0000000..0d55207
--- /dev/null
+++ b/pulsar-io/hdfs/pom.xml
@@ -0,0 +1,69 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io</artifactId>
+ <version>2.2.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>pulsar-io-hdfs</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>3.1.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-nar-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
new file mode 100644
index 0000000..529c350
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConfig.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.Serializable;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Configuration object for all HDFS components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public abstract class AbstractHdfsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * A file or comma separated list of files which contains the Hadoop file system configuration,
+ * e.g. 'core-site.xml', 'hdfs-site.xml'.
+ */
+ private String hdfsConfigResources;
+
+ /**
+ * The HDFS directory from which files should be read from or written to.
+ */
+ private String directory;
+
+ /**
+ * The character encoding for the files, e.g. UTF-8, ASCII, etc.
+ */
+ private String encoding;
+
+ /**
+ * The compression codec used to compress/de-compress the files on HDFS.
+ */
+ private Compression compression;
+
+ /**
+ * The Kerberos user principal account to use for authentication.
+ */
+ private String kerberosUserPrincipal;
+
+ /**
+ * The full pathname to the Kerberos keytab file to use for authentication.
+ */
+ private String keytab;
+
+ public void validate() {
+ if (StringUtils.isEmpty(hdfsConfigResources) || StringUtils.isEmpty(directory)) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ if ((StringUtils.isNotEmpty(kerberosUserPrincipal) && StringUtils.isEmpty(keytab))
+ || (StringUtils.isEmpty(kerberosUserPrincipal) && StringUtils.isNotEmpty(keytab))) {
+ throw new IllegalArgumentException("Values for both kerberosUserPrincipal & keytab are required.");
+ }
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
new file mode 100644
index 0000000..0eccd93
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/AbstractHdfsConnector.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.URI;
+import java.nio.charset.Charset;
+import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
+import java.util.Map;
+import java.util.WeakHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.io.compress.DefaultCodec;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pulsar.io.hdfs.sink.HdfsSinkConfig;
+
+/**
+ * A Simple abstract class for HDFS connectors.
+ * Provides methods for connecting to HDFS
+ */
+public abstract class AbstractHdfsConnector {
+
+ private static final Object RESOURCES_LOCK = new Object();
+
+ // Hadoop Configuration, Filesystem, and UserGroupInformation (optional)
+ protected final AtomicReference<HdfsResources> hdfsResources = new AtomicReference<>();
+ protected AbstractHdfsConfig connectorConfig;
+ protected CompressionCodecFactory compressionCodecFactory;
+
+ public AbstractHdfsConnector() {
+ hdfsResources.set(new HdfsResources(null, null, null));
+ }
+
+ /*
+ * Reset Hadoop Configuration and FileSystem based on the supplied configuration resources.
+ */
+ protected HdfsResources resetHDFSResources(HdfsSinkConfig hdfsSinkConfig) throws IOException {
+ Configuration config = new ExtendedConfiguration();
+ config.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+ getConfig(config, connectorConfig.getHdfsConfigResources());
+
+ // first check for timeout on HDFS connection, because FileSystem has a hard coded 15 minute timeout
+ checkHdfsUriForTimeout(config);
+
+ /* Disable caching of Configuration and FileSystem objects, else we cannot reconfigure
+ * the processor without a complete restart
+ */
+ String disableCacheName = String.format("fs.%s.impl.disable.cache",
+ FileSystem.getDefaultUri(config).getScheme());
+ config.set(disableCacheName, "true");
+
+ // If kerberos is enabled, create the file system as the kerberos principal
+ // -- use RESOURCE_LOCK to guarantee UserGroupInformation is accessed by only a single thread at at time
+ FileSystem fs;
+ UserGroupInformation ugi;
+ synchronized (RESOURCES_LOCK) {
+ if (SecurityUtil.isSecurityEnabled(config)) {
+ ugi = SecurityUtil.loginKerberos(config,
+ connectorConfig.getKerberosUserPrincipal(), connectorConfig.getKeytab());
+ fs = getFileSystemAsUser(config, ugi);
+ } else {
+ config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
+ config.set("hadoop.security.authentication", "simple");
+ ugi = SecurityUtil.loginSimple(config);
+ fs = getFileSystemAsUser(config, ugi);
+ }
+ }
+ return new HdfsResources(config, fs, ugi);
+ }
+
+ private static Configuration getConfig(final Configuration config, String res) throws IOException {
+ boolean foundResources = false;
+ if (null != res) {
+ String[] resources = res.split(",");
+ for (String resource : resources) {
+ config.addResource(new Path(resource.trim()));
+ foundResources = true;
+ }
+ }
+
+ if (!foundResources) {
+ // check that at least 1 non-default resource is available on the classpath
+ String configStr = config.toString();
+ for (String resource : configStr.substring(configStr.indexOf(":") + 1).split(",")) {
+ if (!resource.contains("default") && config.getResource(resource.trim()) != null) {
+ foundResources = true;
+ break;
+ }
+ }
+ }
+
+ if (!foundResources) {
+ throw new IOException("Could not find any of the " + res + " on the classpath");
+ }
+ return config;
+ }
+
+ /*
+ * Reduce the timeout of a socket connection from the default in FileSystem.get()
+ */
+ protected void checkHdfsUriForTimeout(Configuration config) throws IOException {
+ URI hdfsUri = FileSystem.getDefaultUri(config);
+ String address = hdfsUri.getAuthority();
+ int port = hdfsUri.getPort();
+ if (address == null || address.isEmpty() || port < 0) {
+ return;
+ }
+ InetSocketAddress namenode = NetUtils.createSocketAddr(address, port);
+ SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(config);
+ Socket socket = null;
+ try {
+ socket = socketFactory.createSocket();
+ NetUtils.connect(socket, namenode, 1000); // 1 second timeout
+ } finally {
+ IOUtils.closeQuietly(socket);
+ }
+ }
+
+ /**
+ * This exists in order to allow unit tests to override it so that they don't take several
+ * minutes waiting for UDP packets to be received.
+ *
+ * @param config
+ * the configuration to use
+ * @return the FileSystem that is created for the given Configuration
+ * @throws IOException
+ * if unable to create the FileSystem
+ */
+ protected FileSystem getFileSystem(final Configuration config) throws IOException {
+ return FileSystem.get(config);
+ }
+
+ protected FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
+ try {
+ return ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override
+ public FileSystem run() throws Exception {
+ return FileSystem.get(config);
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException("Unable to create file system: " + e.getMessage());
+ }
+ }
+
+ protected Configuration getConfiguration() {
+ return hdfsResources.get().getConfiguration();
+ }
+
+ protected FileSystem getFileSystem() {
+ return hdfsResources.get().getFileSystem();
+ }
+
+ protected UserGroupInformation getUserGroupInformation() {
+ return hdfsResources.get().getUserGroupInformation();
+ }
+
+ protected String getEncoding() {
+ return StringUtils.isNotBlank(connectorConfig.getEncoding())
+ ? connectorConfig.getEncoding() : Charset.defaultCharset().name();
+ }
+
+ protected CompressionCodec getCompressionCodec() {
+ if (connectorConfig.getCompression() == null) {
+ return null;
+ }
+
+ CompressionCodec codec = getCompressionCodecFactory()
+ .getCodecByName(connectorConfig.getCompression().name());
+
+ return (codec != null) ? codec : new DefaultCodec();
+ }
+
+ protected CompressionCodecFactory getCompressionCodecFactory() {
+ if (compressionCodecFactory == null) {
+ compressionCodecFactory = new CompressionCodecFactory(getConfiguration());
+ }
+
+ return compressionCodecFactory;
+ }
+
+ /**
+ * Extending Hadoop Configuration to prevent it from caching classes that can't be found. Since users may be
+ * adding additional JARs to the classpath we don't want them to have to restart the JVM to be able to load
+ * something that was previously not found, but might now be available.
+ * Reference the original getClassByNameOrNull from Configuration.
+ */
+ static class ExtendedConfiguration extends Configuration {
+
+ private final Map<ClassLoader, Map<String, WeakReference<Class<?>>>> cacheClasses = new WeakHashMap<>();
+
+ @Override
+ public Class<?> getClassByNameOrNull(String name) {
+ final ClassLoader classLoader = getClassLoader();
+
+ Map<String, WeakReference<Class<?>>> map;
+ synchronized (cacheClasses) {
+ map = cacheClasses.get(classLoader);
+ if (map == null) {
+ map = Collections.synchronizedMap(new WeakHashMap<>());
+ cacheClasses.put(classLoader, map);
+ }
+ }
+
+ Class<?> clazz = null;
+ WeakReference<Class<?>> ref = map.get(name);
+ if (ref != null) {
+ clazz = ref.get();
+ }
+
+ if (clazz == null) {
+ try {
+ clazz = Class.forName(name, true, classLoader);
+ } catch (ClassNotFoundException e) {
+ return null;
+ }
+ // two putters can race here, but they'll put the same class
+ map.put(name, new WeakReference<>(clazz));
+ return clazz;
+ } else {
+ // cache hit
+ return clazz;
+ }
+ }
+
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
new file mode 100644
index 0000000..97dba53
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/Compression.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+/**
+ * An enumeration of compression codecs available for HDFS.
+ */
+public enum Compression {
+ BZIP2, DEFLATE, GZIP, LZ4, SNAPPY
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
new file mode 100644
index 0000000..1d04c6c
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/HdfsResources.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * A wrapper class for HDFS resources.
+ */
+public class HdfsResources {
+
+ private final Configuration configuration;
+ private final FileSystem fileSystem;
+ private final UserGroupInformation userGroupInformation;
+
+ public HdfsResources(Configuration config, FileSystem fs, UserGroupInformation ugi) {
+ this.configuration = config;
+ this.fileSystem = fs;
+ this.userGroupInformation = ugi;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ public FileSystem getFileSystem() {
+ return fileSystem;
+ }
+
+ public UserGroupInformation getUserGroupInformation() {
+ return userGroupInformation;
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
new file mode 100644
index 0000000..c5462d3
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/SecurityUtil.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
+
+import java.io.IOException;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Provides synchronized access to UserGroupInformation to avoid multiple processors/services from
+ * interfering with each other.
+ */
+public class SecurityUtil {
+ public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+ public static final String KERBEROS = "kerberos";
+
+ /**
+ * Initializes UserGroupInformation with the given Configuration and performs the login for the
+ * given principal and keytab. All logins should happen through this class to ensure other threads
+ * are not concurrently modifying UserGroupInformation.
+ * <p/>
+ * @param config the configuration instance
+ * @param principal the principal to authenticate as
+ * @param keyTab the keytab to authenticate with
+ *
+ * @return the UGI for the given principal
+ *
+ * @throws IOException if login failed
+ */
+ public static synchronized UserGroupInformation loginKerberos(final Configuration config,
+ final String principal, final String keyTab) throws IOException {
+ Validate.notNull(config);
+ Validate.notNull(principal);
+ Validate.notNull(keyTab);
+
+ UserGroupInformation.setConfiguration(config);
+ UserGroupInformation.loginUserFromKeytab(principal.trim(), keyTab.trim());
+ return UserGroupInformation.getCurrentUser();
+ }
+
+ /**
+ * Initializes UserGroupInformation with the given Configuration and
+ * returns UserGroupInformation.getLoginUser(). All logins should happen
+ * through this class to ensure other threads are not concurrently
+ * modifying UserGroupInformation.
+ *
+ * @param config the configuration instance
+ *
+ * @return the UGI for the given principal
+ *
+ * @throws IOException if login failed
+ */
+ public static synchronized UserGroupInformation loginSimple(final Configuration config) throws IOException {
+ Validate.notNull(config);
+ UserGroupInformation.setConfiguration(config);
+ return UserGroupInformation.getLoginUser();
+ }
+
+ /**
+ * Initializes UserGroupInformation with the given Configuration and returns
+ * UserGroupInformation.isSecurityEnabled().
+ * All checks for isSecurityEnabled() should happen through this method.
+ *
+ * @param config the given configuration
+ *
+ * @return true if kerberos is enabled on the given configuration, false otherwise
+ *
+ */
+ public static boolean isSecurityEnabled(final Configuration config) {
+ Validate.notNull(config);
+ return KERBEROS.equalsIgnoreCase(config.get(HADOOP_SECURITY_AUTHENTICATION));
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
new file mode 100644
index 0000000..4294852
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
new file mode 100644
index 0000000..18184e2
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsAbstractSink.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConnector;
+import org.apache.pulsar.io.hdfs.HdfsResources;
+
+/**
+ * A Simple abstract class for HDFS sink.
+ * Users need to implement extractKeyValue function to use this sink.
+ */
+public abstract class HdfsAbstractSink<K, V> extends AbstractHdfsConnector implements Sink<V> {
+
+ protected HdfsSinkConfig hdfsSinkConfig;
+ protected BlockingQueue<Record<V>> unackedRecords;
+ protected HdfsSyncThread<V> syncThread;
+ private Path path;
+ private FSDataOutputStream hdfsStream;
+
+ public abstract KeyValue<K, V> extractKeyValue(Record<V> record);
+ protected abstract void createWriter() throws IOException;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ hdfsSinkConfig = HdfsSinkConfig.load(config);
+ hdfsSinkConfig.validate();
+ connectorConfig = hdfsSinkConfig;
+ unackedRecords = new LinkedBlockingQueue<Record<V>> (hdfsSinkConfig.getMaxPendingRecords());
+ connectToHdfs();
+ createWriter();
+ launchSyncThread();
+ }
+
+ @Override
+ public void close() throws Exception {
+ syncThread.halt();
+ syncThread.join(0);
+ }
+
+ protected final void connectToHdfs() throws IOException {
+ try {
+ HdfsResources resources = hdfsResources.get();
+
+ if (resources.getConfiguration() == null) {
+ resources = this.resetHDFSResources(hdfsSinkConfig);
+ hdfsResources.set(resources);
+ }
+ } catch (IOException ex) {
+ hdfsResources.set(new HdfsResources(null, null, null));
+ throw ex;
+ }
+ }
+
+ @SuppressWarnings("rawtypes")
+ protected final FSDataOutputStreamBuilder getOutputStreamBuilder() throws IOException {
+ Path path = getPath();
+ FileSystem fs = getFileSystemAsUser(getConfiguration(), getUserGroupInformation());
+ FSDataOutputStreamBuilder builder = fs.exists(path) ? fs.appendFile(path) : fs.createFile(path);
+ return builder.recursive().permission(new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+ }
+
+ protected FSDataOutputStream getHdfsStream() throws IllegalArgumentException, IOException {
+ if (hdfsStream == null) {
+ hdfsStream = getOutputStreamBuilder().build();
+ }
+ return hdfsStream;
+ }
+
+ protected final Path getPath() {
+ if (path == null) {
+ String ext = "";
+ if (StringUtils.isNotBlank(hdfsSinkConfig.getFileExtension())) {
+ ext = hdfsSinkConfig.getFileExtension();
+ } else if (getCompressionCodec() != null) {
+ ext = getCompressionCodec().getDefaultExtension();
+ }
+
+ path = new Path(FilenameUtils.concat(hdfsSinkConfig.getDirectory(),
+ hdfsSinkConfig.getFilenamePrefix() + "-" + System.currentTimeMillis() + ext));
+ }
+ return path;
+ }
+
+ protected final void launchSyncThread() throws IOException {
+ syncThread = new HdfsSyncThread<V>(getHdfsStream(), unackedRecords, hdfsSinkConfig.getSyncInterval());
+ syncThread.start();
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
new file mode 100644
index 0000000..e9f4cae
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfig.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.io.hdfs.AbstractHdfsConfig;
+
+/**
+ * Configuration object for all HDFS Sink components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class HdfsSinkConfig extends AbstractHdfsConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The prefix of the files to create inside the HDFS directory, i.e. a value of "topicA"
+ * will result in files named topicA-, topicA-, etc being produced
+ */
+ private String filenamePrefix;
+
+ /**
+ * The extension to add to the files written to HDFS, e.g. '.txt', '.seq', etc.
+ */
+ private String fileExtension;
+
+ /**
+ * The character to use to separate records in a text file. If no value is provided
+ * then the content from all of the records will be concatenated together in one continuous
+ * byte array.
+ */
+ private char separator;
+
+ /**
+ * The interval (in milliseconds) between calls to flush data to HDFS disk.
+ */
+ private long syncInterval;
+
+ /**
+ * The maximum number of records that we hold in memory before acking. Default is Integer.MAX_VALUE.
+ * Setting this value to one, results in every record being sent to disk before the record is acked,
+ * while setting it to a higher values allows us to buffer records before flushing them all to disk.
+ */
+ private int maxPendingRecords = Integer.MAX_VALUE;
+
+ public static HdfsSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), HdfsSinkConfig.class);
+ }
+
+ public static HdfsSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), HdfsSinkConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ if ((StringUtils.isEmpty(fileExtension) && getCompression() == null)
+ || StringUtils.isEmpty(filenamePrefix)) {
+ throw new IllegalArgumentException("Required property not set.");
+ }
+
+ if (syncInterval < 0) {
+ throw new IllegalArgumentException("Sync Interval cannot be negative");
+ }
+
+ if (maxPendingRecords < 1) {
+ throw new IllegalArgumentException("Max Pending Records must be a positive integer");
+ }
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
new file mode 100644
index 0000000..3c19ed8
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/HdfsSyncThread.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.pulsar.functions.api.Record;
+
+/**
+ * A thread that runs in the background and acknowledges Records
+ * after they have been written to disk.
+ *
+ * @param <V>
+ */
+public class HdfsSyncThread<V> extends Thread {
+
+ private final Syncable stream;
+ private final BlockingQueue<Record<V>> unackedRecords;
+ private final long syncInterval;
+ private boolean keepRunning = true;
+
+ public HdfsSyncThread(Syncable stream, BlockingQueue<Record<V>> unackedRecords, long syncInterval) {
+ this.stream = stream;
+ this.unackedRecords = unackedRecords;
+ this.syncInterval = syncInterval;
+ }
+
+ @Override
+ public void run() {
+ while (keepRunning) {
+ try {
+ Thread.sleep(syncInterval);
+ ackRecords();
+ } catch (InterruptedException e) {
+ return;
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public final void halt() throws IOException, InterruptedException {
+ keepRunning = false;
+ ackRecords();
+ }
+
+ private void ackRecords() throws IOException, InterruptedException {
+
+ if (CollectionUtils.isEmpty(unackedRecords)) {
+ return;
+ }
+
+ synchronized (stream) {
+ stream.hsync();
+ }
+
+ while (!unackedRecords.isEmpty()) {
+ unackedRecords.take().ack();
+ }
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
new file mode 100644
index 0000000..c6506d9
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
new file mode 100644
index 0000000..7c61c20
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsAbstractSequenceFileSink.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * HDFS Sink that writes it contents to HDFS as Sequence Files.
+ *
+ * @param <K> - The incoming Key type
+ * @param <V> - The incoming Value type
+ * @param <HdfsK> - The HDFS Key type
+ * @param <HdfsV> - The HDFS Value type
+ */
+public abstract class HdfsAbstractSequenceFileSink<K, V, HdfsK, HdfsV>
+ extends HdfsAbstractSink<K, V> implements Sink<V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractSequenceFileSink.class);
+
+ protected AtomicLong counter;
+ protected FSDataOutputStream hdfsStream;
+ protected Writer writer = null;
+
+ public abstract KeyValue<HdfsK, HdfsV> convert(KeyValue<K, V> kv);
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ super.close();
+ }
+
+ @Override
+ protected void createWriter() throws IOException {
+ writer = getWriter();
+ }
+
+ @Override
+ public void write(Record<V> record) {
+ try {
+ KeyValue<K, V> kv = extractKeyValue(record);
+ KeyValue<HdfsK, HdfsV> keyValue = convert(kv);
+ writer.append(keyValue.getKey(), keyValue.getValue());
+ unackedRecords.put(record);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Unable to write to file " + getPath(), e);
+ record.fail();
+ }
+ }
+
+ protected Writer getWriter() throws IOException {
+ counter = new AtomicLong(0);
+ List<Option> options = getOptions();
+ return SequenceFile.createWriter(getConfiguration(),
+ options.toArray(new Option[options.size()]));
+ }
+
+ protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+ List<Option> list = new ArrayList<Option>();
+ list.add(Writer.stream(getHdfsStream()));
+
+ if (getCompressionCodec() != null) {
+ list.add(Writer.compression(SequenceFile.CompressionType.RECORD, getCompressionCodec()));
+ }
+ return list;
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
new file mode 100644
index 0000000..84ce09f
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialTextSink.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * This Sink should be used when the records are originating from a sequential source,
+ * and we want to retain the record sequence.This class uses the record's sequence id as
+ * the sequence id in the HDFS Sequence File if it is available, if not a sequence id is
+ * auto-generated for each new record.
+ */
+public class HdfsSequentialTextSink extends HdfsAbstractSequenceFileSink<Long, String, LongWritable, Text> {
+
+ private AtomicLong counter;
+
+ @Override
+ public Writer getWriter() throws IOException {
+ counter = new AtomicLong(0);
+
+ return SequenceFile
+ .createWriter(
+ getConfiguration(),
+ getOptions().toArray(new Option[getOptions().size()]));
+ }
+
+ @Override
+ protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+ List<Option> opts = super.getOptions();
+ opts.add(Writer.keyClass(LongWritable.class));
+ opts.add(Writer.valueClass(Text.class));
+ return opts;
+ }
+
+ @Override
+ public KeyValue<Long, String> extractKeyValue(Record<String> record) {
+ Long sequence = record.getRecordSequence().orElseGet(() -> new Long(counter.incrementAndGet()));
+ return new KeyValue<>(sequence, new String(record.getValue()));
+ }
+
+ @Override
+ public KeyValue<LongWritable, Text> convert(KeyValue<Long, String> kv) {
+ return new KeyValue<>(new LongWritable(kv.getKey()), new Text(kv.getValue()));
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
new file mode 100644
index 0000000..84ebc07
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSink.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.SequenceFile.Writer.Option;
+import org.apache.hadoop.io.Text;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+
+/**
+ * A Simple Sink class for Hdfs Sequence File.
+ */
+public class HdfsTextSink extends
+ HdfsAbstractSequenceFileSink<String, String, Text, Text> {
+
+ @Override
+ protected List<Option> getOptions() throws IllegalArgumentException, IOException {
+ List<Option> opts = super.getOptions();
+ opts.add(Writer.keyClass(Text.class));
+ opts.add(Writer.valueClass(Text.class));
+ return opts;
+ }
+
+ @Override
+ public KeyValue<String, String> extractKeyValue(Record<String> record) {
+ String key = record.getKey().orElseGet(() -> new String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
+ }
+
+ @Override
+ public KeyValue<Text, Text> convert(KeyValue<String, String> kv) {
+ return new KeyValue<>(new Text(kv.getKey()), new Text(kv.getValue()));
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
new file mode 100644
index 0000000..025311a
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/seq/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
new file mode 100644
index 0000000..6fb50a5
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsAbstractTextFileSink.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.hdfs.sink.HdfsAbstractSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Base class for HDFS Sinks that writes there contents to HDFS as Text Files.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public abstract class HdfsAbstractTextFileSink<K, V> extends HdfsAbstractSink<K, V> implements Sink<V> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HdfsAbstractTextFileSink.class);
+
+ protected OutputStreamWriter writer;
+
+ @Override
+ protected void createWriter() throws IOException {
+ writer = new OutputStreamWriter(new BufferedOutputStream(openHdfsStream()), getEncoding());
+ }
+
+ @Override
+ public void close() throws Exception {
+ writer.close();
+ super.close();
+ }
+
+ @Override
+ public void write(Record<V> record) {
+ try {
+ KeyValue<K, V> kv = extractKeyValue(record);
+ writer.write(kv.getValue().toString());
+
+ if (hdfsSinkConfig.getSeparator() != '\u0000') {
+ writer.write(hdfsSinkConfig.getSeparator());
+ }
+ unackedRecords.put(record);
+ } catch (IOException | InterruptedException e) {
+ LOG.error("Unable to write to file " + getPath(), e);
+ record.fail();
+ }
+ }
+
+ private OutputStream openHdfsStream() throws IOException {
+ if (hdfsSinkConfig.getCompression() != null) {
+ return getCompressionCodec().createOutputStream(getHdfsStream());
+ } else {
+ return getHdfsStream();
+ }
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
new file mode 100644
index 0000000..355c6df
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Sink;
+
+/**
+ * A Simple Sink class for Hdfs Text File.
+ */
+public class HdfsStringSink extends HdfsAbstractTextFileSink<String, String> implements Sink<String> {
+ @Override
+ public KeyValue<String, String> extractKeyValue(Record<String> record) {
+ String key = record.getKey().orElseGet(() -> new String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
+ }
+}
diff --git a/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
new file mode 100644
index 0000000..9ade570
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/java/org/apache/pulsar/io/hdfs/sink/text/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
\ No newline at end of file
diff --git a/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
new file mode 100644
index 0000000..f2a2b55
--- /dev/null
+++ b/pulsar-io/hdfs/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+name: hdfs
+description: Writes data into HDFS
+sinkClass: org.apache.pulsar.io.hdfs.sink.text.HdfsStringSink
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
new file mode 100644
index 0000000..32085dd
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/AbstractHdfsSinkTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SinkContext;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeMethod;
+
+/**
+ * Simple base class for all the HDFS sink test cases.
+ * Provides utility methods for sending records to the sink.
+ *
+ */
+public abstract class AbstractHdfsSinkTest<K, V> {
+
+ @Mock
+ protected SinkContext mockSinkContext;
+
+ @Mock
+ protected Record<V> mockRecord;
+
+ protected Map<String, Object> map;
+ protected HdfsAbstractSink<K, V> sink;
+
+ @SuppressWarnings("unchecked")
+ @BeforeMethod
+ public final void setUp() throws Exception {
+ map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml,"
+ + "../incubator-pulsar/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml");
+ map.put("directory", "/tmp/testing");
+ map.put("filenamePrefix", "prefix");
+
+ mockSinkContext = mock(SinkContext.class);
+
+ mockRecord = mock(Record.class);
+ when(mockRecord.getRecordSequence()).thenAnswer(new Answer<Optional<Long>>() {
+ long sequenceCounter = 0;
+ public Optional<Long> answer(InvocationOnMock invocation) throws Throwable {
+ return Optional.of(sequenceCounter++);
+ }});
+
+ when(mockRecord.getKey()).thenAnswer(new Answer<Optional<String>>() {
+ long sequenceCounter = 0;
+ public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
+ return Optional.of( "key-" + sequenceCounter++);
+ }});
+
+ when(mockRecord.getValue()).thenAnswer(new Answer<String>() {
+ long sequenceCounter = 0;
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return new String( "value-" + sequenceCounter++ + "-" + UUID.randomUUID().toString());
+ }});
+
+ createSink();
+ }
+
+ protected abstract void createSink();
+
+ protected final void send(int numRecords) throws Exception {
+ for (int idx = 0; idx < numRecords; idx++) {
+ sink.write(mockRecord);
+ }
+ }
+
+ protected final void runFor(int numSeconds) throws InterruptedException {
+ Producer producer = new Producer();
+ producer.start();
+ Thread.sleep(numSeconds * 1000); // Run for N seconds
+ producer.halt();
+ producer.join(2000);
+ }
+
+ protected final class Producer extends Thread {
+ public boolean keepRunning = true;
+ @Override
+ public void run() {
+ while (keepRunning)
+ try {
+ sink.write(mockRecord);
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ public void halt() {
+ keepRunning = false;
+ }
+
+ }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
new file mode 100644
index 0000000..2f0b3f3
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/HdfsSinkConfigTests.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pulsar.io.hdfs.Compression;
+import org.testng.annotations.Test;
+
+import com.fasterxml.jackson.databind.exc.InvalidFormatException;
+
+
+public class HdfsSinkConfigTests {
+
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ HdfsSinkConfig config = HdfsSinkConfig.load(yamlFile.getAbsolutePath());
+ assertNotNull(config);
+ assertEquals("core-site.xml", config.getHdfsConfigResources());
+ assertEquals("/foo/bar", config.getDirectory());
+ assertEquals("prefix", config.getFilenamePrefix());
+ assertEquals(Compression.SNAPPY, config.getCompression());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("compression", "SNAPPY");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("core-site.xml", config.getHdfsConfigResources());
+ assertEquals("/foo/bar", config.getDirectory());
+ assertEquals("prefix", config.getFilenamePrefix());
+ assertEquals(Compression.SNAPPY, config.getCompression());
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("fileExtension", ".txt");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Required property not set.")
+ public final void missingDirectoryValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Required property not set.")
+ public final void missingHdfsConfigsValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("directory", "/foo/bar");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = InvalidFormatException.class)
+ public final void invalidCodecValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("fileExtension", ".txt");
+ map.put("compression", "bad value");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Sync Interval cannot be negative")
+ public final void invalidSyncIntervalTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("fileExtension", ".txt");
+ map.put("syncInterval", -1);
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Max Pending Records must be a positive integer")
+ public final void invalidMaxPendingRecordsTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("fileExtension", ".txt");
+ map.put("maxPendingRecords", 0);
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "Values for both kerberosUserPrincipal & keytab are required.")
+ public final void kerberosValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<String, Object> ();
+ map.put("hdfsConfigResources", "core-site.xml");
+ map.put("directory", "/foo/bar");
+ map.put("filenamePrefix", "prefix");
+ map.put("keytab", "/etc/keytab/hdfs.client.ktab");
+
+ HdfsSinkConfig config = HdfsSinkConfig.load(map);
+ config.validate();
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
new file mode 100644
index 0000000..d54e5ec
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsSequentialSinkTests.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsSequentialSinkTests extends AbstractHdfsSinkTest<Long, String> {
+
+ @Override
+ protected void createSink() {
+ sink = new HdfsSequentialTextSink();
+ }
+
+ @Test(enabled = false)
+ public final void write100Test() throws Exception {
+ map.put("filenamePrefix", "write100Test-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(sink);
+ send(100);
+
+ Thread.sleep(2000);
+ verify(mockRecord, times(100)).ack();
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void write5000Test() throws Exception {
+ map.put("filenamePrefix", "write5000Test-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(sink);
+ send(5000);
+
+ Thread.sleep(2000);
+ verify(mockRecord, times(5000)).ack();
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void tenSecondTest() throws Exception {
+ map.put("filenamePrefix", "tenSecondTest-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+ runFor(10);
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void bzip2CompressionTest() throws Exception {
+ map.put("filenamePrefix", "bzip2CompressionTest-seq");
+ map.put("compression", "BZIP2");
+ map.remove("fileExtension");
+ sink.open(map, mockSinkContext);
+ send(5000);
+ verify(mockRecord, times(5000)).ack();
+ }
+
+ @Test(enabled = false)
+ public final void deflateCompressionTest() throws Exception {
+ map.put("filenamePrefix", "deflateCompressionTest-seq");
+ map.put("compression", "DEFLATE");
+ map.remove("fileExtension");
+ sink.open(map, mockSinkContext);
+ send(5000);
+ verify(mockRecord, times(5000)).ack();
+ }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
new file mode 100644
index 0000000..bb720fa
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/seq/HdfsTextSinkTests.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.seq;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertNotNull;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsTextSinkTests extends AbstractHdfsSinkTest<String, String> {
+
+ @Override
+ protected void createSink() {
+ sink = new HdfsTextSink();
+ }
+
+ @Test(enabled = false)
+ public final void write100Test() throws Exception {
+ map.put("filenamePrefix", "write100TestText-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(sink);
+ assertNotNull(mockRecord);
+ send(100);
+
+ Thread.sleep(2000);
+ verify(mockRecord, times(100)).ack();
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void write5000Test() throws Exception {
+ map.put("filenamePrefix", "write5000TestText-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(sink);
+ assertNotNull(mockRecord);
+ send(5000);
+
+ Thread.sleep(2000);
+ verify(mockRecord, times(5000)).ack();
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void tenSecondTest() throws Exception {
+ map.put("filenamePrefix", "tenSecondTestText-seq");
+ map.put("fileExtension", ".seq");
+ map.put("syncInterval", 1000);
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(mockRecord);
+
+ runFor(10);
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void bzip2CompressionTest() throws Exception {
+ map.put("filenamePrefix", "bzip2CompressionTestText-seq");
+ map.put("compression", "BZIP2");
+ map.remove("fileExtension");
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(mockRecord);
+
+ send(5000);
+ verify(mockRecord, times(5000)).ack();
+ }
+
+ @Test(enabled = false)
+ public final void deflateCompressionTest() throws Exception {
+ map.put("filenamePrefix", "deflateCompressionTestText-seq");
+ map.put("compression", "DEFLATE");
+ map.remove("fileExtension");
+ sink.open(map, mockSinkContext);
+
+ assertNotNull(mockRecord);
+ send(5000);
+ verify(mockRecord, times(5000)).ack();
+ }
+}
diff --git a/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
new file mode 100644
index 0000000..98b8a61
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/java/org/apache/pulsar/io/hdfs/sink/text/HdfsStringSinkTests.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.hdfs.sink.text;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import org.apache.pulsar.io.hdfs.sink.AbstractHdfsSinkTest;
+import org.testng.annotations.Test;
+
+public class HdfsStringSinkTests extends AbstractHdfsSinkTest<String, String> {
+
+ @Override
+ protected void createSink() {
+ sink = new HdfsStringSink();
+ }
+
+ @Test(enabled = false)
+ public final void write5000Test() throws Exception {
+ map.put("filenamePrefix", "write5000Test");
+ map.put("fileExtension", ".txt");
+ map.put("separator", '\n');
+ sink.open(map, mockSinkContext);
+ send(5000);
+ sink.close();
+ verify(mockRecord, times(5000)).ack();
+ }
+
+ @Test(enabled = false)
+ public final void fiveByTwoThousandTest() throws Exception {
+ map.put("filenamePrefix", "fiveByTwoThousandTest");
+ map.put("fileExtension", ".txt");
+ map.put("separator", '\n');
+ sink.open(map, mockSinkContext);
+
+ for (int idx = 1; idx < 6; idx++) {
+ send(2000);
+ }
+ sink.close();
+ verify(mockRecord, times(2000 * 5)).ack();
+ }
+
+ @Test(enabled = false)
+ public final void tenSecondTest() throws Exception {
+ map.put("filenamePrefix", "tenSecondTest");
+ map.put("fileExtension", ".txt");
+ map.put("separator", '\n');
+ sink.open(map, mockSinkContext);
+ runFor(10);
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void maxPendingRecordsTest() throws Exception {
+ map.put("filenamePrefix", "maxPendingRecordsTest");
+ map.put("fileExtension", ".txt");
+ map.put("separator", '\n');
+ map.put("maxPendingRecords", 500);
+ sink.open(map, mockSinkContext);
+ runFor(10);
+ sink.close();
+ }
+
+ @Test(enabled = false)
+ public final void bzip2CompressionTest() throws Exception {
+ map.put("filenamePrefix", "bzip2CompressionTest");
+ map.put("compression", "BZIP2");
+ map.remove("fileExtension");
+ map.put("separator", '\n');
+ sink.open(map, mockSinkContext);
+ send(5000);
+ sink.close();
+ verify(mockRecord, times(5000)).ack();
+ }
+
+ @Test(enabled = false)
+ public final void deflateCompressionTest() throws Exception {
+ map.put("filenamePrefix", "deflateCompressionTest");
+ map.put("compression", "DEFLATE");
+ map.put("fileExtension", ".deflate");
+ map.put("separator", '\n');
+ sink.open(map, mockSinkContext);
+ send(50000);
+ sink.close();
+ verify(mockRecord, times(50000)).ack();
+ }
+}
diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
index 92f2186..31d1e98 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/core-site.xml
@@ -18,29 +18,15 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
- <version>2.2.0-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-io</artifactId>
- <name>Pulsar IO :: Parent</name>
-
- <modules>
- <module>core</module>
- <module>twitter</module>
- <module>cassandra</module>
- <module>aerospike</module>
- <module>kafka</module>
- <module>rabbitmq</module>
- <module>kinesis</module>
- <module>jdbc</module>
- <module>data-genenator</module>
- </modules>
-
-</project>
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>hdfs://0.0.0.0:8020</value>
+ </property>
+ <property>
+ <name>io.compression.codecs</name>
+ <value>org.apache.hadoop.io.compress.GzipCodec,
+ org.apache.hadoop.io.compress.DefaultCodec,
+ org.apache.hadoop.io.compress.SnappyCodec</value>
+ </property>
+</configuration>
diff --git a/pulsar-io/pom.xml b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
similarity index 50%
copy from pulsar-io/pom.xml
copy to pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
index 92f2186..bb722f1 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/hdfs/src/test/resources/hadoop/hdfs-site.xml
@@ -18,29 +18,17 @@
under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <packaging>pom</packaging>
- <parent>
- <groupId>org.apache.pulsar</groupId>
- <artifactId>pulsar</artifactId>
- <version>2.2.0-incubating-SNAPSHOT</version>
- </parent>
-
- <artifactId>pulsar-io</artifactId>
- <name>Pulsar IO :: Parent</name>
-
- <modules>
- <module>core</module>
- <module>twitter</module>
- <module>cassandra</module>
- <module>aerospike</module>
- <module>kafka</module>
- <module>rabbitmq</module>
- <module>kinesis</module>
- <module>jdbc</module>
- <module>data-genenator</module>
- </modules>
-
-</project>
+<configuration>
+ <property>
+ <name>dfs.replication</name>
+ <value>1</value>
+ </property>
+ <property>
+ <name>dfs.client.use.datanode.hostname</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>dfs.support.append</name>
+ <value>true</value>
+ </property>
+</configuration>
diff --git a/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..5a19ee0
--- /dev/null
+++ b/pulsar-io/hdfs/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"hdfsConfigResources": "core-site.xml",
+"directory": "/foo/bar",
+"filenamePrefix": "prefix",
+"compression": "SNAPPY"
+}
\ No newline at end of file
diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml
index 92f2186..5c03370 100644
--- a/pulsar-io/pom.xml
+++ b/pulsar-io/pom.xml
@@ -39,6 +39,7 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>kinesis</module>
+ <module>hdfs</module>
<module>jdbc</module>
<module>data-genenator</module>
</modules>
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
new file mode 100644
index 0000000..58f781f
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/HdfsContainer.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.containers;
+
+import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
+
+public class HdfsContainer extends ChaosContainer<HdfsContainer> {
+
+ public static final String NAME = "HDFS";
+ static final Integer[] PORTS = { 8020, 8032, 8088, 9000, 10020, 19888, 50010, 50020, 50070, 50070, 50090 };
+
+ private static final String IMAGE_NAME = "harisekhon/hadoop:latest";
+
+ public HdfsContainer(String clusterName) {
+ super(clusterName, IMAGE_NAME);
+ }
+
+ @Override
+ public String getContainerName() {
+ return clusterName;
+ }
+
+ @Override
+ protected void configure() {
+ super.configure();
+ this.withNetworkAliases(NAME)
+ .withExposedPorts(PORTS)
+ .withCreateContainerCmdModifier(createContainerCmd -> {
+ createContainerCmd.withHostName(NAME);
+ createContainerCmd.withName(clusterName + "-" + NAME);
+ })
+ .waitingFor(new HostPortWaitStrategy());
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 7b3cd4f..17634a1 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -72,7 +72,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
public void testCassandraArchiveSink() throws Exception {
testSink(new CassandraSinkArchiveTester(), false);
}
-
+
+ @Test(enabled = false)
+ public void testHdfsSink() throws Exception {
+ testSink(new HdfsSinkTester(), false);
+ }
+
@Test
public void testJdbcSink() throws Exception {
testSink(new JdbcSinkTester(), true);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
new file mode 100644
index 0000000..46c5f24
--- /dev/null
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/HdfsSinkTester.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.tests.integration.io;
+
+import java.util.Map;
+
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
+import org.testcontainers.containers.GenericContainer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class HdfsSinkTester extends SinkTester {
+
+ private static final String NAME = "HDFS";
+
+ private HdfsContainer hdfsCluster;
+
+ public HdfsSinkTester() {
+ super(SinkType.HDFS);
+
+ // TODO How do I get the core-site.xml, and hdfs-site.xml files from the container?
+ sinkConfig.put("hdfsConfigResources", "");
+ sinkConfig.put("directory", "/testing/test");
+ }
+
+ @Override
+ public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) {
+ GenericContainer<?> container = containers.get(NAME);
+ checkState(container instanceof HdfsContainer, "No HDFS service found in the cluster");
+ this.hdfsCluster = (HdfsContainer) container;
+ }
+
+ @Override
+ public void prepareSink() throws Exception {
+ // Create the test directory
+ hdfsCluster.execInContainer("/hadoop/bin/hdfs","dfs", "-mkdir", "/tmp/testing");
+ hdfsCluster.execInContainer("/hadoop/bin/hdfs", "-chown", "tester:testing", "/tmp/testing");
+
+ // Execute all future commands as the "tester" user
+ hdfsCluster.execInContainer("export HADOOP_USER_NAME=tester");
+ }
+
+ @Override
+ public void validateSinkResult(Map<String, String> kvs) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 098b8bf..7f4b2d9 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -33,7 +33,8 @@ public abstract class SinkTester {
UNDEFINED,
CASSANDRA,
KAFKA,
- JDBC
+ JDBC,
+ HDFS
}
protected final SinkType sinkType;
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
index 147f273..438c96e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.tests.integration.suites;
import java.util.Map;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
+import org.apache.pulsar.tests.integration.containers.HdfsContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarClusterSpecBuilder;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.testcontainers.containers.GenericContainer;