You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/10/17 03:18:12 UTC

[camel] branch master updated: CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 3032641  CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254)
3032641 is described below

commit 3032641514838fcab5e5a37ee01dfa82a9f59cb7
Author: Marius Cornescu <ma...@yahoo.com>
AuthorDate: Thu Oct 17 05:17:57 2019 +0200

    CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254)
    
    * CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes
    
    * CAMEL-14073 : camel-hdfs - Cleanup HA/Cluster related classes
    
    * Update hdfs-component.adoc
    
    * Update hdfs-component.adoc
---
 components/camel-hdfs/pom.xml                      |  5 ++
 .../camel-hdfs/src/main/docs/hdfs-component.adoc   | 18 ++++++
 .../component/hdfs/HaConfigurationBuilder.java     | 52 ++++++++++++---
 .../camel/component/hdfs/HdfsArrayFileType.java    |  4 +-
 .../camel/component/hdfs/HdfsBloommapFileType.java |  4 +-
 .../camel/component/hdfs/HdfsConfiguration.java    | 19 +++++-
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 21 +++---
 .../org/apache/camel/component/hdfs/HdfsInfo.java  | 56 +++-------------
 .../camel/component/hdfs/HdfsInfoFactory.java      | 63 ++++++++++++++++--
 .../camel/component/hdfs/HdfsInputStream.java      | 31 ++++-----
 .../camel/component/hdfs/HdfsMapFileType.java      |  4 +-
 .../camel/component/hdfs/HdfsOsgiHelper.java       |  8 +--
 .../camel/component/hdfs/HdfsOutputStream.java     | 46 ++++++-------
 .../apache/camel/component/hdfs/HdfsProducer.java  | 75 ++++++++++------------
 .../camel/component/hdfs/HdfsSequenceFileType.java |  4 +-
 .../kerberos/KerberosConfigurationBuilder.java     |  7 +-
 .../camel/component/hdfs/FromFileToHdfsTest.java   |  8 +--
 .../component/hdfs/HaConfigurationBuilderTest.java | 41 ++++++++++--
 .../camel/component/hdfs/HdfsConsumerTest.java     | 36 +++++------
 .../apache/camel/component/hdfs/HdfsInfoTest.java} | 36 +++++++----
 .../component/hdfs/HdfsProducerConsumerTest.java   |  6 +-
 .../component/hdfs/HdfsProducerSplitTest.java      |  8 +--
 .../camel/component/hdfs/HdfsProducerTest.java     | 34 +++++-----
 .../camel/component/hdfs/HdfsTestSupport.java      | 44 +++++++++++--
 .../hdfs/kerberos/KerberosAuthenticationTest.java  | 11 ++--
 .../kerberos/KerberosConfigurationBuilderTest.java | 16 ++---
 26 files changed, 394 insertions(+), 263 deletions(-)

diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml
index b7bbc17e..d2c9532 100644
--- a/components/camel-hdfs/pom.xml
+++ b/components/camel-hdfs/pom.xml
@@ -139,6 +139,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-api</artifactId>
             <scope>test</scope>
diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 423f915..ae3703c 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -327,6 +327,24 @@ resource with bundle that contains blueprint definition.
 This way Hadoop 2.x will have correct mapping of URI schemes to
 filesystem implementations.
 
+=== Using this component with a HighAvailability configuration
+
+In a HA setup, there will be multiple nodes (_configured through the *namedNodes* parameter_).
+The "hostname" and "port" portion of the endpoint uri will no longer have a _"host"_ meaning, but it will represent the name given to the cluster.
+
+You can choose whatever name you want for the cluster (_the name should follow the [a-zA-Z0-9] convention_).
+This name will be sanitized by replacing the _dirty_ characters with underscore. This is done so that a host name or ip could pottentialy be used, if it makes sense to you.
+
+The cluster name will be mapped to the HA filesystem with a coresponding proxy, with failover, and the _works_.
+
+[source,java]
+------------------------------------------------------------------------------------------------------
+
+from("hdfs://node1_and_2_cluster/dir1/dir2?namedNodes=node1.exemple.org:8020,node2.exemple.org:8020").routeId(...)
+...
+------------------------------------------------------------------------------------------------------
+
+
 === Using this component with Kerberos authentication
 
 The kerberos config file is read when the camel component is created, not when the endpoint is created.
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
index 1d432b3..ca5daad 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java
@@ -19,6 +19,7 @@ package org.apache.camel.component.hdfs;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -31,6 +32,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 final class HaConfigurationBuilder {
 
     private static final String HFDS_NAMED_SERVICE = "hfdsNamedService";
+    private static final String HFDS_NAMED_SERVICE_SEPARATOR = "_";
     private static final String HFDS_FS = "fs.defaultFS";
 
     private HaConfigurationBuilder() {
@@ -49,31 +51,63 @@ final class HaConfigurationBuilder {
      * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
      * <p>
      *
-     * @param namedNodes                 - All named nodes from the hadoop cluster
-     * @param replicationFactor          - dfs replication factor
+     * @param configuration  - hdfs configuration that will be setup with the HA settings
+     * @param endpointConfig - configuration with the HA settings configured on the endpoint
      */
-    static void withClusterConfiguration(Configuration configuration, List<String> namedNodes, int replicationFactor) {
+    static void withClusterConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) {
+        String haNamedService = getSanitizedClusterName(endpointConfig.getHostName());
+        withClusterConfiguration(configuration, haNamedService, endpointConfig.getNamedNodeList(), endpointConfig.getReplication());
+    }
+
+    /**
+     * Generates the correct HA configuration (normally read from xml) based on the namedNodes:
+     * All named nodes have to be qualified: configuration.set("dfs.ha.namenodes.hfdsNamedService","namenode1,namenode2");
+     * For each named node the following entries is added
+     * <p>
+     * configuration.set("dfs.namenode.rpc-address.hfdsNamedService.namenode1", "namenode1:1234");
+     * <p>
+     * Finally the proxy provider has to be specified:
+     * <p>
+     * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+     * <p>
+     *
+     * @param configuration     - hdfs configuration that will be setup with the HA settings
+     * @param haNamedService    - how the ha named service that represents the cluster will be named (used to resolve the FS)
+     * @param namedNodes        - All named nodes from the hadoop cluster
+     * @param replicationFactor - dfs replication factor
+     */
+    static void withClusterConfiguration(Configuration configuration, String haNamedService, List<String> namedNodes, int replicationFactor) {
         configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(replicationFactor));
-        configuration.set(DFSConfigKeys.DFS_NAMESERVICES, HFDS_NAMED_SERVICE);
+        configuration.set(DFSConfigKeys.DFS_NAMESERVICES, haNamedService);
         configuration.set(
-                DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, HFDS_NAMED_SERVICE),
+                DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, haNamedService),
                 nodeToString(namedNodes.stream().map(HaConfigurationBuilder::nodeToString).collect(Collectors.joining(",")))
         );
 
         namedNodes.forEach(nodeName ->
                 configuration.set(
-                        DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, HFDS_NAMED_SERVICE, nodeToString(nodeName)),
+                        DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, haNamedService, nodeToString(nodeName)),
                         nodeName)
         );
 
-        configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + HFDS_NAMED_SERVICE, ConfiguredFailoverProxyProvider.class.getName());
+        configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + haNamedService, ConfiguredFailoverProxyProvider.class.getName());
+
+        configuration.set(HFDS_FS, "hdfs://" + haNamedService);
+
+    }
+
+    static String getSanitizedClusterName(String rawClusterName) {
+        String clusterName = HFDS_NAMED_SERVICE;
 
-        configuration.set(HFDS_FS, "hdfs://" + HFDS_NAMED_SERVICE);
+        if (StringUtils.isNotEmpty(rawClusterName)) {
+            clusterName = rawClusterName.replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR);
+        }
 
+        return clusterName;
     }
 
     private static String nodeToString(String nodeName) {
-        return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_");
+        return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR);
     }
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
index 4c4123c..8b5aa8d 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
@@ -65,7 +65,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
             Closeable rout;
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
             Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
-            rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+            rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
                     configuration.getCompressionType(), () -> { });
             return rout;
         } catch (IOException ex) {
@@ -78,7 +78,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType {
         try {
             Closeable rin;
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf());
+            rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
index be4c6d3..fa0c0ab 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
@@ -73,7 +73,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
             Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
-            rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+            rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
                     MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
                     MapFile.Writer.progressable(() -> {
@@ -89,7 +89,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType {
         try {
             Closeable rin;
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+            rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index be5bdb3..c5d3dc4 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -98,6 +98,7 @@ public class HdfsConfiguration {
     private String kerberosKeytabLocation;
 
     public HdfsConfiguration() {
+        // default constructor
     }
 
     private Boolean getBoolean(Map<String, Object> hdfsSettings, String param, Boolean dflt) {
@@ -207,7 +208,7 @@ public class HdfsConfiguration {
 
     private List<String> getNamedNodeList(Map<String, Object> hdfsSettings) {
         namedNodes = getString(hdfsSettings, "namedNodes", namedNodes);
-        
+
         if (isNotEmpty(namedNodes)) {
             return Arrays.stream(namedNodes.split(",")).distinct().collect(Collectors.toList());
         }
@@ -560,6 +561,10 @@ public class HdfsConfiguration {
         return namedNodeList;
     }
 
+    public boolean hasClusterConfiguration() {
+        return !getNamedNodeList().isEmpty();
+    }
+
     public String getKerberosConfigFileLocation() {
         return kerberosConfigFileLocation;
     }
@@ -595,7 +600,17 @@ public class HdfsConfiguration {
     }
 
     public boolean isKerberosAuthentication() {
-        return isNotEmpty(namedNodes) && isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation);
+        return isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation);
+    }
+
+    /**
+     * Get the label of the hdfs file system like: HOST_NAME:PORT/PATH
+     *
+     * @param path
+     * @return HOST_NAME:PORT/PATH
+     */
+    String getFileSystemLabel(String path) {
+        return String.format("%s:%s/%s", getHostName(), getPort(), path);
     }
 
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 9971f74..52b1249 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -38,12 +38,10 @@ import org.apache.hadoop.fs.PathFilter;
 
 public final class HdfsConsumer extends ScheduledPollConsumer {
 
-    public static final long DEFAULT_CONSUMER_INITIAL_DELAY = 10 * 1000L;
-
     private final HdfsConfiguration config;
     private final StringBuilder hdfsPath;
     private final Processor processor;
-    private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
+    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
 
     public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
         super(endpoint, processor);
@@ -69,24 +67,21 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     }
 
     private HdfsInfo setupHdfs(boolean onStartup) throws IOException {
+        String hdfsFsDescription = config.getFileSystemLabel(hdfsPath.toString());
         // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
         if (onStartup) {
-            log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath);
+            log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath);
-            }
+            log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
         }
 
         // hadoop will cache the connection by default so its faster to get in the poll method
         HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString(), config);
 
         if (onStartup) {
-            log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath);
+            log.info("Connected to hdfs file-system {}", hdfsFsDescription);
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath);
-            }
+            log.debug("Connected to hdfs file-system {}", hdfsFsDescription);
         }
         return answer;
     }
@@ -199,11 +194,11 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
 
     private HdfsInputStream createInputStream(FileStatus fileStatus) {
         try {
-            this.rwlock.writeLock().lock();
+            this.rwLock.writeLock().lock();
 
             return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config);
         } finally {
-            this.rwlock.writeLock().unlock();
+            this.rwLock.writeLock().unlock();
         }
     }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
index bd9f245..ff8035e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
@@ -16,29 +16,23 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-
-import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication;
-import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-public final class HdfsInfo {
+final class HdfsInfo {
 
-    private Configuration configuration;
-    private FileSystem fileSystem;
-    private Path path;
+    private final Configuration configuration;
+    private final FileSystem fileSystem;
+    private final Path path;
 
-    HdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
-        this.configuration = newConfiguration(endpointConfig);
-        this.fileSystem = newFileSystem(this.configuration, hdfsPath, endpointConfig);
-        this.path = new Path(hdfsPath);
+    HdfsInfo(Configuration configuration, FileSystem fileSystem, Path hdfsPath) {
+        this.configuration = configuration;
+        this.fileSystem = fileSystem;
+        this.path = hdfsPath;
     }
 
-    public Configuration getConf() {
+    public Configuration getConfiguration() {
         return configuration;
     }
 
@@ -50,36 +44,4 @@ public final class HdfsInfo {
         return path;
     }
 
-    private static Configuration newConfiguration(HdfsConfiguration endpointConfig) {
-        Configuration configuration = new Configuration();
-
-        if (endpointConfig.isKerberosAuthentication()) {
-            String kerberosConfigFileLocation = endpointConfig.getKerberosConfigFileLocation();
-            KerberosConfigurationBuilder.withKerberosConfiguration(configuration, kerberosConfigFileLocation);
-
-        }
-
-        List<String> namedNodes = endpointConfig.getNamedNodeList();
-        if (!namedNodes.isEmpty()) {
-            HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig.getNamedNodeList(), endpointConfig.getReplication());
-
-        }
-
-        return configuration;
-    }
-
-    /**
-     * this will connect to the hadoop hdfs file system, and in case of no connection
-     * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
-     */
-    private static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
-        if (endpointConfig.isKerberosAuthentication()) {
-            String userName = endpointConfig.getKerberosUsername();
-            String keytabLocation = endpointConfig.getKerberosKeytabLocation();
-            new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab();
-        }
-
-        return FileSystem.get(URI.create(hdfsPath), configuration);
-    }
-
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
index 4862a70..6bb47da 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
@@ -17,22 +17,77 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
+import java.net.URI;
 
-import javax.security.auth.login.Configuration;
+import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication;
+import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 public final class HdfsInfoFactory {
 
     private HdfsInfoFactory() {
+        // hidden
     }
 
-    public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException {
+    static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
+        javax.security.auth.login.Configuration auth = HdfsComponent.getJAASConfiguration();
         try {
-            return new HdfsInfo(hdfsPath, configuration);
+            return newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
         } finally {
             HdfsComponent.setJAASConfiguration(auth);
         }
     }
 
+    static HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+        Configuration configuration = newConfiguration(endpointConfig);
+
+        authenticate(configuration, endpointConfig);
+
+        FileSystem fileSystem = newFileSystem(configuration, hdfsPath, endpointConfig);
+        Path path = new Path(hdfsPath);
+
+        return new HdfsInfo(configuration, fileSystem, path);
+    }
+
+    static Configuration newConfiguration(HdfsConfiguration endpointConfig) {
+        Configuration configuration = new Configuration();
+
+        if (endpointConfig.isKerberosAuthentication()) {
+            KerberosConfigurationBuilder.withKerberosConfiguration(configuration, endpointConfig);
+        }
+
+        if (endpointConfig.hasClusterConfiguration()) {
+            HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig);
+        }
+
+        return configuration;
+    }
+
+    static void authenticate(Configuration configuration, HdfsConfiguration endpointConfig) throws IOException {
+        if (endpointConfig.isKerberosAuthentication()) {
+            String userName = endpointConfig.getKerberosUsername();
+            String keytabLocation = endpointConfig.getKerberosKeytabLocation();
+            new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab();
+        }
+    }
+
+    /**
+     * this will connect to the hadoop hdfs file system, and in case of no connection
+     * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
+     */
+    static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+        FileSystem fileSystem;
+        if (endpointConfig.hasClusterConfiguration()) {
+            // using default FS that was set during in the cluster configuration (@see org.apache.camel.component.hdfs.HaConfigurationBuilder)
+            fileSystem = FileSystem.get(configuration);
+        } else {
+            fileSystem = FileSystem.get(URI.create(hdfsPath), configuration);
+        }
+
+        return fileSystem;
+    }
+
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 40212e8..49e3ddc 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,7 +18,6 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
@@ -53,29 +52,27 @@ public class HdfsInputStream implements Closeable {
      * @throws IOException
      */
     public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) {
-        HdfsInputStream ret = new HdfsInputStream();
-        ret.fileType = configuration.getFileType();
-        ret.actualPath = hdfsPath;
-        ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
-        ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
-        ret.chunkSize = configuration.getChunkSize();
+        HdfsInputStream iStream = new HdfsInputStream();
+        iStream.fileType = configuration.getFileType();
+        iStream.actualPath = hdfsPath;
+        iStream.suffixedPath = iStream.actualPath + '.' + configuration.getOpenedSuffix();
+        iStream.suffixedReadPath = iStream.actualPath + '.' + configuration.getReadSuffix();
+        iStream.chunkSize = configuration.getChunkSize();
         try {
-            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration);
-            if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
-                ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
-                ret.opened = true;
-                ret.config = configuration;
+            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(iStream.actualPath, configuration);
+            if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) {
+                iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, configuration);
+                iStream.opened = true;
+                iStream.config = configuration;
             } else {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
-                }
-                ret = null;
+                LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath);
+                iStream = null;
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
 
-        return ret;
+        return iStream;
     }
 
     @Override
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
index 01868b7..fbd3e04 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
@@ -72,7 +72,7 @@ class HdfsMapFileType extends DefaultHdfsFileType {
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
             Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
             Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
-            rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
+            rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
                     MapFile.Writer.progressable(() -> {
                     }));
@@ -87,7 +87,7 @@ class HdfsMapFileType extends DefaultHdfsFileType {
         try {
             Closeable rin;
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+            rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration());
             return rin;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
index 9d9a5cc..51efaf7 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java
@@ -43,10 +43,10 @@ public class HdfsOsgiHelper {
             Configuration conf = new Configuration();
             // set that as the hdfs configuration's classloader
             conf.setClassLoader(cl);
-            for (String key : fileSystems.keySet()) {
-                URI uri = URI.create(key);
-                conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fileSystems.get(key)), FileSystem.class);
-                LOG.debug("Successfully loaded class: {}", fileSystems.get(key));
+            for (Map.Entry<String, String> fsEntry : fileSystems.entrySet()) {
+                URI uri = URI.create(fsEntry.getKey());
+                conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fsEntry.getValue()), FileSystem.class);
+                LOG.debug("Successfully loaded class: {}", fsEntry.getValue());
                 FileSystem.get(uri, conf);
                 LOG.debug("Successfully got uri: {} from FileSystem Object", uri);
             }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index 6e623e3..49a3a00 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -43,35 +43,35 @@ public class HdfsOutputStream implements Closeable {
     }
 
     public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException {
-        HdfsOutputStream ret = new HdfsOutputStream();
-        ret.fileType = configuration.getFileType();
-        ret.actualPath = hdfsPath;
-        ret.info = new HdfsInfo(ret.actualPath, configuration);
+        HdfsOutputStream oStream = new HdfsOutputStream();
+        oStream.fileType = configuration.getFileType();
+        oStream.actualPath = hdfsPath;
+        oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.actualPath, configuration);
+
+        oStream.suffixedPath = oStream.actualPath + '.' + configuration.getOpenedSuffix();
+
+        Path actualPath = new Path(oStream.actualPath);
+        boolean actualPathExists = oStream.info.getFileSystem().exists(actualPath);
 
-        ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
         if (configuration.isWantAppend() || configuration.isAppend()) {
-            if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
-                configuration.setAppend(false);
-            } else {
+            if (actualPathExists) {
                 configuration.setAppend(true);
-                ret.info = new HdfsInfo(ret.suffixedPath, configuration);
-                ret.info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
+                oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.suffixedPath, configuration);
+                oStream.info.getFileSystem().rename(actualPath, new Path(oStream.suffixedPath));
+            } else {
+                configuration.setAppend(false);
             }
-        } else {
-            if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) {
-                //only check if not directory
-                if (!ret.info.getFileSystem().isDirectory(new Path(ret.actualPath))) {
-                    if (configuration.isOverwrite()) {
-                        ret.info.getFileSystem().delete(new Path(ret.actualPath), true);
-                    } else {
-                        throw new RuntimeCamelException("The file already exists");
-                    }
-                }
+        } else if (actualPathExists && !oStream.info.getFileSystem().isDirectory(actualPath)) { // only check if not directory
+            if (configuration.isOverwrite()) {
+                oStream.info.getFileSystem().delete(actualPath, true);
+            } else {
+                throw new RuntimeCamelException("The file already exists");
             }
         }
-        ret.out = ret.fileType.createOutputStream(ret.suffixedPath, configuration);
-        ret.opened = true;
-        return ret;
+
+        oStream.out = oStream.fileType.createOutputStream(oStream.suffixedPath, configuration);
+        oStream.opened = true;
+        return oStream;
     }
 
     @Override
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 568ee71..048a120 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -30,18 +30,14 @@ import org.apache.camel.Expression;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StringHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class HdfsProducer extends DefaultProducer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(HdfsProducer.class);
-
     private final HdfsConfiguration config;
     private final StringBuilder hdfsPath;
     private final AtomicBoolean idle = new AtomicBoolean(false);
     private volatile ScheduledExecutorService scheduler;
-    private volatile HdfsOutputStream ostream;
+    private volatile HdfsOutputStream oStream;
 
     public static final class SplitStrategy {
         private SplitStrategyType type;
@@ -106,7 +102,7 @@ public class HdfsProducer extends DefaultProducer {
 
             // setup hdfs if configured to do on startup
             if (getEndpoint().getConfig().isConnectOnStartup()) {
-                ostream = setupHdfs(true);
+                oStream = setupHdfs(true);
             }
 
             Optional<SplitStrategy> idleStrategy = tryFindIdleStrategy(config.getSplitStrategies());
@@ -116,8 +112,8 @@ public class HdfsProducer extends DefaultProducer {
                 scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy.get()), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS);
             }
         } catch (Exception e) {
-            LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
-            LOG.debug("", e);
+            log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
+            log.debug("", e);
             throw new RuntimeException(e);
         } finally {
             HdfsComponent.setJAASConfiguration(auth);
@@ -125,8 +121,8 @@ public class HdfsProducer extends DefaultProducer {
     }
 
     private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws IOException {
-        if (ostream != null) {
-            return ostream;
+        if (oStream != null) {
+            return oStream;
         }
 
         StringBuilder actualPath = new StringBuilder(hdfsPath);
@@ -134,23 +130,21 @@ public class HdfsProducer extends DefaultProducer {
             actualPath = newFileName();
         }
 
+        String hdfsFsDescription = config.getFileSystemLabel(actualPath.toString());
+
         // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log
         if (onStartup) {
-            log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath);
+            log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath);
-            }
+            log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription);
         }
 
         HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
 
         if (onStartup) {
-            log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath);
+            log.info("Connected to hdfs file-system {}", hdfsFsDescription);
         } else {
-            if (log.isDebugEnabled()) {
-                log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath);
-            }
+            log.debug("Connected to hdfs file-system {}", hdfsFsDescription);
         }
 
         return answer;
@@ -172,9 +166,9 @@ public class HdfsProducer extends DefaultProducer {
             getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler);
             scheduler = null;
         }
-        if (ostream != null) {
-            IOHelper.close(ostream, "output stream", log);
-            ostream = null;
+        if (oStream != null) {
+            IOHelper.close(oStream, "output stream", log);
+            oStream = null;
         }
     }
 
@@ -195,27 +189,27 @@ public class HdfsProducer extends DefaultProducer {
 
         // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath
         if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) {
-            if (ostream != null) {
-                IOHelper.close(ostream, "output stream", log);
+            if (oStream != null) {
+                IOHelper.close(oStream, "output stream", log);
             }
             StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange);
-            ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
-        } else if (ostream == null) {
-            // must have ostream
-            ostream = setupHdfs(false);
+            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+        } else if (oStream == null) {
+            // must have oStream
+            oStream = setupHdfs(false);
         }
 
         if (isSplitRequired(config.getSplitStrategies())) {
-            if (ostream != null) {
-                IOHelper.close(ostream, "output stream", log);
+            if (oStream != null) {
+                IOHelper.close(oStream, "output stream", log);
             }
             StringBuilder actualPath = newFileName();
-            ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
+            oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config);
         }
 
-        String path = ostream.getActualPath();
+        String path = oStream.getActualPath();
         log.trace("Writing body to hdfs-file {}", path);
-        ostream.append(key, body, exchange.getContext().getTypeConverter());
+        oStream.append(key, body, exchange.getContext().getTypeConverter());
 
         idle.set(false);
 
@@ -231,8 +225,8 @@ public class HdfsProducer extends DefaultProducer {
         if (close) {
             try {
                 HdfsProducer.this.log.trace("Closing stream");
-                ostream.close();
-                ostream = null;
+                oStream.close();
+                oStream = null;
             } catch (IOException e) {
                 // ignore
             }
@@ -243,6 +237,7 @@ public class HdfsProducer extends DefaultProducer {
 
     /**
      * helper method to construct the hdfsPath from the CamelFileName String or Expression
+     *
      * @param exchange
      * @return
      */
@@ -253,7 +248,7 @@ public class HdfsProducer extends DefaultProducer {
         if (value instanceof String) {
             fileName = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
         } else if (value instanceof Expression) {
-            fileName =  ((Expression) value).evaluate(exchange, String.class);
+            fileName = ((Expression) value).evaluate(exchange, String.class);
         }
         return actualPath.append(fileName);
     }
@@ -261,7 +256,7 @@ public class HdfsProducer extends DefaultProducer {
     private boolean isSplitRequired(List<SplitStrategy> strategies) {
         boolean split = false;
         for (SplitStrategy splitStrategy : strategies) {
-            split |= splitStrategy.getType().split(ostream, splitStrategy.value, this);
+            split |= splitStrategy.getType().split(oStream, splitStrategy.value, this);
         }
         return split;
     }
@@ -285,18 +280,18 @@ public class HdfsProducer extends DefaultProducer {
 
         @Override
         public void run() {
-            // only run if ostream has been created
-            if (ostream == null) {
+            // only run if oStream has been created
+            if (oStream == null) {
                 return;
             }
 
             HdfsProducer.this.log.trace("IdleCheck running");
 
-            if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value && !idle.get() && !ostream.isBusy().get()) {
+            if (System.currentTimeMillis() - oStream.getLastAccess() > strategy.value && !idle.get() && !oStream.isBusy().get()) {
                 idle.set(true);
                 try {
                     HdfsProducer.this.log.trace("Closing stream as idle");
-                    ostream.close();
+                    oStream.close();
                 } catch (IOException e) {
                     // ignore
                 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
index 0bfde2e..159183a 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
@@ -71,7 +71,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
             Class<?> keyWritableClass = configuration.getKeyType().getWritableClass();
             Class<?> valueWritableClass = configuration.getValueType().getWritableClass();
-            rout = SequenceFile.createWriter(hdfsInfo.getConf(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
+            rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass),
                     SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(configuration.getBufferSize()),
                     SequenceFile.Writer.replication(configuration.getReplication()), SequenceFile.Writer.blockSize(configuration.getBlockSize()),
                     SequenceFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
@@ -88,7 +88,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType {
         try {
             Closeable rin;
             HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
-            rin = new SequenceFile.Reader(hdfsInfo.getConf(), SequenceFile.Reader.file(hdfsInfo.getPath()));
+            rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath()));
             return rin;
         } catch (IOException ex) {
             throw new RuntimeCamelException(ex);
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
index b5025cc..c753c46 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.hdfs.kerberos;
 
 import java.io.File;
 
+import org.apache.camel.component.hdfs.HdfsConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,10 +37,10 @@ public final class KerberosConfigurationBuilder {
     /**
      * Add all the kerberos specific settings needed for this authentication mode
      *
-     * @param kerberosConfigFileLocation - The location of the kerberos config file (on the server)
+     * @param endpointConfig - configuration with the HA settings configured on the endpoint
      */
-    public static void withKerberosConfiguration(Configuration configuration, String kerberosConfigFileLocation) {
-        setKerberosConfigFile(kerberosConfigFileLocation);
+    public static void withKerberosConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) {
+        setKerberosConfigFile(endpointConfig.getKerberosConfigFileLocation());
         configuration.set(AUTHENTICATION_MODE, "kerberos");
 
     }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
index a134bce..fdf8d99 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java
@@ -37,7 +37,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
     @Override
     @Before
     public void setUp() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         deleteDirectory("target/inbox");
@@ -48,7 +48,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
     @Override
     @After
     public void tearDown() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -61,7 +61,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
 
     @Test
     public void testFileToHdfs() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -80,7 +80,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport {
 
     @Test
     public void testTwoFilesToHdfs() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
index 3cf20fc..91dffe7 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java
@@ -33,21 +33,48 @@ public class HaConfigurationBuilderTest {
     public void withClusterConfiguration() {
         // given
         Configuration configuration = new Configuration();
+        String haClusterName = "haCluster";
         List<String> namedNodes = Arrays.asList("kerb_node_01.example.com:8021", "kerb_node_02.example.com:8022");
         int replicationFactor = 3;
 
         // when
-        HaConfigurationBuilder.withClusterConfiguration(configuration, namedNodes, replicationFactor);
+        HaConfigurationBuilder.withClusterConfiguration(configuration, haClusterName, namedNodes, replicationFactor);
 
         // then
         assertThat(configuration, notNullValue());
         assertThat(configuration.get(DFSConfigKeys.DFS_REPLICATION_KEY), is("3"));
-        assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("hfdsNamedService"));
-        assertThat(configuration.get("dfs.ha.namenodes.hfdsNamedService"), is("kerb_node_01_example_com,kerb_node_02_example_com"));
-        assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021"));
-        assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022"));
-        assertThat(configuration.get("dfs.client.failover.proxy.provider.hfdsNamedService"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"));
-        assertThat(configuration.get("fs.defaultFS"), is("hdfs://hfdsNamedService"));
+        assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("haCluster"));
+        assertThat(configuration.get("dfs.ha.namenodes.haCluster"), is("kerb_node_01_example_com,kerb_node_02_example_com"));
+        assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021"));
+        assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022"));
+        assertThat(configuration.get("dfs.client.failover.proxy.provider.haCluster"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"));
+        assertThat(configuration.get("fs.defaultFS"), is("hdfs://haCluster"));
+    }
+
+    @Test
+    public void getSanitizedClusterNameWithNull() {
+        // given
+        String haClusterName = null;
+
+        // when
+        String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName);
+
+        // then
+        assertThat(actual, notNullValue());
+        assertThat(actual, is("hfdsNamedService"));
+    }
+
+    @Test
+    public void getSanitizedClusterNameWithHostName() {
+        // given
+        String haClusterName = "this.is.a.cluster.host";
+
+        // when
+        String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName);
+
+        // then
+        assertThat(actual, notNullValue());
+        assertThat(actual, is("this_is_a_cluster_host"));
     }
 
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 0845fbe..f0ec9de 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -67,7 +67,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     @Override
     @Before
     public void setUp() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -84,7 +84,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleConsumer() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -113,11 +113,11 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testConcurrentConsumers() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
-        final File rootdir = new File(".");
+        final File rootdir = CWD;
         final File dir = new File("target/test/multiple-consumers");
         dir.mkdirs();
         for (int i = 1; i <= ITERATIONS; i++) {
@@ -157,7 +157,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleConsumerWithEmptyFile() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -186,7 +186,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -217,7 +217,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -242,7 +242,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadWithReadSuffix() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -287,7 +287,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadBoolean() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -315,7 +315,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadByte() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -346,7 +346,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadFloat() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -376,7 +376,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadDouble() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -406,7 +406,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadInt() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -436,7 +436,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadLong() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -466,7 +466,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadBytes() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -496,7 +496,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadString() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -526,7 +526,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testReadStringArrayFile() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -560,7 +560,7 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     @Override
     @After
     public void tearDown() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
similarity index 52%
copy from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
copy to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
index 4862a70..eff5fa9 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java
@@ -18,21 +18,29 @@ package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
 
-import javax.security.auth.login.Configuration;
+import org.junit.Test;
 
-public final class HdfsInfoFactory {
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
 
-    private HdfsInfoFactory() {
-    }
+public class HdfsInfoTest {
 
-    public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException {
-        // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
-        try {
-            return new HdfsInfo(hdfsPath, configuration);
-        } finally {
-            HdfsComponent.setJAASConfiguration(auth);
-        }
-    }
+    private HdfsInfo underTest;
+
+    @Test
+    public void createHdfsInfo() throws IOException {
+        // given
+        String hdfsPath = "hdfs://localhost/target/test/multiple-consumers";
+        HdfsConfiguration endpointConfig = mock(HdfsConfiguration.class);
 
-}
+        // when
+        underTest = HdfsInfoFactory.newHdfsInfoWithoutAuth(hdfsPath, endpointConfig);
+
+        // then
+        assertThat(underTest, notNullValue());
+        assertThat(underTest.getConfiguration(), notNullValue());
+        assertThat(underTest.getFileSystem(), notNullValue());
+        assertThat(underTest.getPath(), notNullValue());
+    }
+}
\ No newline at end of file
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
index 0f71bdd..7e98793 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java
@@ -34,7 +34,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
     @Override
     @Before
     public void setUp() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         super.setUp();
@@ -47,7 +47,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleSplitWriteRead() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -81,7 +81,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport {
     @Override
     @After
     public void tearDown() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
index b3c2e85..60976f5 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java
@@ -35,7 +35,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
     @Override
     @Before
     public void setUp() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         super.setUp();
@@ -53,7 +53,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
 
     @Test
     public void testSimpleWriteFileWithIdleSplit() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -86,7 +86,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
     }
 
     private void doTest(int routeNr) throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -108,7 +108,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport {
     @Override
     @After
     public void tearDown() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
index f4cdfc0..a97f29b 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java
@@ -53,7 +53,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
     @Override
     @Before
     public void setUp() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         super.setUp();
@@ -61,7 +61,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testProducer() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         template.sendBody("direct:start1", "PAPPO");
@@ -79,7 +79,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testProducerClose() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         for (int i = 0; i < 10; ++i) {
@@ -105,7 +105,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteBoolean() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         Boolean aBoolean = true;
@@ -125,7 +125,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteByte() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         byte aByte = 8;
@@ -145,7 +145,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteInt() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         int anInt = 1234;
@@ -165,7 +165,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteFloat() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         float aFloat = 12.34f;
@@ -185,7 +185,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteDouble() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         Double aDouble = 12.34D;
@@ -205,7 +205,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteLong() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         long aLong = 1234567890;
@@ -225,7 +225,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteText() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         String txt = "CIAO MONDO !";
@@ -245,7 +245,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteTextWithKey() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         String txtKey = "THEKEY";
@@ -266,7 +266,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testMapWriteTextWithKey() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         String txtKey = "THEKEY";
@@ -286,7 +286,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testArrayWriteText() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         String txtValue = "CIAO MONDO !";
@@ -305,7 +305,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testBloomMapWriteText() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         String txtKey = "THEKEY";
@@ -325,7 +325,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteTextWithDynamicFilename() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -348,7 +348,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
 
     @Test
     public void testWriteTextWithDynamicFilenameExpression() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
 
@@ -372,7 +372,7 @@ public class HdfsProducerTest extends HdfsTestSupport {
     @Override
     @After
     public void tearDown() throws Exception {
-        if (!canTest()) {
+        if (skipTest()) {
             return;
         }
         super.tearDown();
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
index 6399fc0..e406014 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java
@@ -16,25 +16,55 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.File;
+import java.util.Objects;
+
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.util.Shell;
 
 public abstract class HdfsTestSupport extends CamelTestSupport {
 
-    public boolean canTest() {
+    public static final File CWD = new File(".");
+
+    private static Boolean skipTests;
+
+    public boolean skipTest() {
+        if (Objects.isNull(skipTests)) {
+            skipTests = notConfiguredToRunTests();
+        }
+
+        return skipTests;
+    }
+
+    private boolean notConfiguredToRunTests() {
+        return isJavaFromIbm() || missingLocalHadoopConfiguration() || missingAuthenticationConfiguration();
+    }
+
+    private static boolean isJavaFromIbm() {
         // Hadoop doesn't run on IBM JDK
-        if (System.getProperty("java.vendor").contains("IBM")) {
-            return false;
+        return System.getProperty("java.vendor").contains("IBM");
+    }
+
+    private static boolean missingLocalHadoopConfiguration() {
+        boolean hasLocalHadoop;
+        try {
+            String hadoopHome = Shell.getHadoopHome();
+            hasLocalHadoop = StringUtils.isNotEmpty(hadoopHome);
+        } catch (Throwable e) {
+            hasLocalHadoop = false;
         }
+        return !hasLocalHadoop;
+    }
 
-        // must be able to get security configuration
+    private boolean missingAuthenticationConfiguration() {
         try {
             javax.security.auth.login.Configuration.getConfiguration();
+            return false;
         } catch (Exception e) {
             log.debug("Cannot run test due security exception", e);
-            return false;
+            return true;
         }
-
-        return true;
     }
 
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
index ca94682..383a45d 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java
@@ -16,13 +16,14 @@
  */
 package org.apache.camel.component.hdfs.kerberos;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
+
 public class KerberosAuthenticationTest {
 
     private KerberosAuthentication underTest;
@@ -33,7 +34,7 @@ public class KerberosAuthenticationTest {
         Configuration configuration = new Configuration();
 
         String username = "test_user";
-        String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/test-keytab.bin";
+        String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-keytab.bin";
 
         underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation);
 
@@ -50,7 +51,7 @@ public class KerberosAuthenticationTest {
         Configuration configuration = new Configuration();
 
         String username = "test_user";
-        String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/missing.bin";
+        String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/missing.bin";
 
         underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation);
 
@@ -61,8 +62,4 @@ public class KerberosAuthenticationTest {
         /* exception was thrown */
     }
 
-    private String pwd() {
-        return new File(".").getAbsolutePath();
-    }
-
 }
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
index aa00a82..af75002 100644
--- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java
@@ -16,13 +16,9 @@
  */
 package org.apache.camel.component.hdfs.kerberos;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
 import org.junit.Test;
 
+import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD;
 import static org.junit.Assert.*;
 
 public class KerberosConfigurationBuilderTest {
@@ -30,7 +26,7 @@ public class KerberosConfigurationBuilderTest {
     @Test
     public void withKerberosConfiguration() {
         // given
-        String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/test-kerb5.conf";
+        String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-kerb5.conf";
 
         // when
         KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -43,7 +39,7 @@ public class KerberosConfigurationBuilderTest {
     public void setKerberosConfigFileWithRealFile() {
         // given
         String kerb5FileName = "test-kerb5.conf";
-        String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName;
+        String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName;
 
         // when
         KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -58,7 +54,7 @@ public class KerberosConfigurationBuilderTest {
     public void setKerberosConfigFileWithMissingFile() {
         // given
         String kerb5FileName = "missing-kerb5.conf";
-        String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName;
+        String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName;
 
         // when
         KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation);
@@ -68,8 +64,4 @@ public class KerberosConfigurationBuilderTest {
         assertNull(actual);
     }
 
-    private String pwd() {
-        return new File(".").getAbsolutePath();
-    }
-
 }