You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2019/09/27 12:07:05 UTC

[camel] 01/16: CAMEL-13998 - Kerberos authentication for HDFS connections

This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0a8a767564af6cb95986ec0525dc68faa2bfee16
Author: marius cornescu <ma...@yahoo.com>
AuthorDate: Thu Sep 19 18:21:40 2019 +0200

    CAMEL-13998 - Kerberos authentication for HDFS connections
---
 .../camel-hdfs/src/main/docs/hdfs-component.adoc   |  13 +-
 .../apache/camel/component/hdfs/HdfsComponent.java |  27 ----
 .../camel/component/hdfs/HdfsConfiguration.java    | 146 +++++++++++++++++---
 .../apache/camel/component/hdfs/HdfsConsumer.java  |  25 ++--
 .../camel/component/hdfs/HdfsFileSystemType.java   |   4 +-
 .../apache/camel/component/hdfs/HdfsFileType.java  |  95 ++++++-------
 .../org/apache/camel/component/hdfs/HdfsInfo.java  |  48 +++++--
 .../camel/component/hdfs/HdfsInfoFactory.java      |   8 +-
 .../camel/component/hdfs/HdfsInputStream.java      |   7 +-
 .../camel/component/hdfs/HdfsOutputStream.java     |   4 +-
 .../apache/camel/component/hdfs/HdfsProducer.java  |  18 ++-
 .../kerberos/HdfsKerberosConfigurationFactory.java |  30 ++++
 .../hdfs/kerberos/KerberosConfiguration.java       |  95 +++++++++++++
 .../HdfsKerberosConfigurationFactoryTest.java      |  25 ++++
 .../src/test/resources/kerberos/test-kerb5.conf    |  12 ++
 .../endpoint/dsl/HdfsEndpointBuilderFactory.java   | 151 +++++++++++++++++++++
 .../springboot/HdfsComponentConfiguration.java     |  13 --
 17 files changed, 564 insertions(+), 157 deletions(-)

diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
index 2364335..8ae81b9 100644
--- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc
+++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc
@@ -54,14 +54,13 @@ fileMode=Append to append each of the chunks together.
 
 
 // component options: START
-The HDFS component supports 2 options, which are listed below.
+The HDFS component supports 1 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *jAASConfiguration* (common) | To use the given configuration for security with JAAS. |  | Configuration
 | *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
 |===
 // component options: END
@@ -92,7 +91,7 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (40 parameters):
+=== Query Parameters (44 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -101,6 +100,10 @@ with the following path and query parameters:
 | *connectOnStartup* (common) | Whether to connect to the HDFS file system on starting the producer/consumer. If false then the connection is created on-demand. Notice that HDFS may take up till 15 minutes to establish a connection, as it has hardcoded 45 x 20 sec redelivery. By setting this option to false allows your application to startup, and not block for up till 15 minutes. | true | boolean
 | *fileSystemType* (common) | Set to LOCAL to not use HDFS but local java.io.File instead. | HDFS | HdfsFileSystemType
 | *fileType* (common) | The file type to use. For more details see Hadoop HDFS documentation about the various files types. | NORMAL_FILE | HdfsFileType
+| *kerberosConfigFileLocation* (common) | The location of the kerb5.conf file (\https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html) |  | String
+| *kerberosKeytabLocation* (common) | The location of the keytab file used to authenticate with the kerberos nodes |  | String
+| *kerberosNamedNodes* (common) | A comma separated list of kerberos nodes (e.g. host01.example.com:8021,host02.example.com:8021,host03.example.com:8025) |  | String
+| *kerberosUsername* (common) | The username used to authenticate with the kerberos nodes |  | String
 | *keyType* (common) | The type for the key in case of sequence or map files. | NULL | WritableType
 | *owner* (common) | The file owner must match this owner for the consumer to pickup the file. Otherwise the file is skipped. |  | String
 | *valueType* (common) | The type for the key in case of sequence or map files | BYTES | WritableType
@@ -109,7 +112,7 @@ with the following path and query parameters:
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll any files, you can enable this option to send an empty message (no body) instead. | false | boolean
 | *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
-| *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPollStrategy
+| *pollStrategy* (consumer) | A pluggable org.apache.camel.PollingConsumerPollingStrategy allowing you to provide your custom implementation to control error handling usually occurred during the poll operation before an Exchange have been created and being routed in Camel. |  | PollingConsumerPoll Strategy
 | *append* (producer) | Append to existing file. Notice that not all HDFS file systems support the append option. | false | boolean
 | *lazyStartProducer* (producer) | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and [...]
 | *overwrite* (producer) | Whether to overwrite existing files with the same name | true | boolean
@@ -132,7 +135,7 @@ with the following path and query parameters:
 | *greedy* (scheduler) | If greedy is enabled, then the ScheduledPollConsumer will run immediately again, if the previous run polled 1 or more messages. | false | boolean
 | *initialDelay* (scheduler) | Milliseconds before the first poll starts. You can also specify time values using units, such as 60s (60 seconds), 5m30s (5 minutes and 30 seconds), and 1h (1 hour). | 1000 | long
 | *runLoggingLevel* (scheduler) | The consumer logs a start/complete log line when it polls. This option allows you to configure the logging level for that. | TRACE | LoggingLevel
-| *scheduledExecutorService* (scheduler) | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. |  | ScheduledExecutorService
+| *scheduledExecutorService* (scheduler) | Allows for configuring a custom/shared thread pool to use for the consumer. By default each consumer has its own single threaded thread pool. |  | ScheduledExecutor Service
 | *scheduler* (scheduler) | To use a cron scheduler from either camel-spring or camel-quartz component | none | String
 | *schedulerProperties* (scheduler) | To configure additional properties when using a custom scheduler or any of the Quartz, Spring based scheduler. |  | Map
 | *startScheduler* (scheduler) | Whether the scheduler should be auto started. | true | boolean
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
index 1586ce7..6100c38 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
@@ -53,31 +53,4 @@ public class HdfsComponent extends DefaultComponent {
         }
     }
 
-    static Configuration getJAASConfiguration() {
-        Configuration auth = null;
-        try {
-            auth = Configuration.getConfiguration();
-            LOG.trace("Existing JAAS Configuration {}", auth);
-        } catch (SecurityException e) {
-            LOG.trace("Cannot load existing JAAS configuration", e);
-        }
-        return auth;
-    }
-
-    /**
-     * To use the given configuration for security with JAAS.
-     */
-    static void setJAASConfiguration(Configuration auth) {
-        if (auth != null) {
-            LOG.trace("Restoring existing JAAS Configuration {}", auth);
-            try {
-                Configuration.setConfiguration(auth);
-            } catch (SecurityException e) {
-                LOG.trace("Cannot restore JAAS Configuration. This exception is ignored.", e);
-            }
-        } else {
-            LOG.trace("No JAAS Configuration to restore");
-        }
-    }
-
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index fc9329f..c43284c 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -19,8 +19,11 @@ package org.apache.camel.component.hdfs;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
@@ -28,19 +31,27 @@ import org.apache.camel.spi.UriParams;
 import org.apache.camel.spi.UriPath;
 import org.apache.camel.util.URISupport;
 import org.apache.hadoop.io.SequenceFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.Configuration;
 
 @UriParams
 public class HdfsConfiguration {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsConfiguration.class);
+
     private URI uri;
     private boolean wantAppend;
     private List<HdfsProducer.SplitStrategy> splitStrategies;
 
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String hostName;
     @UriPath(defaultValue = "" + HdfsConstants.DEFAULT_PORT)
     private int port = HdfsConstants.DEFAULT_PORT;
-    @UriPath @Metadata(required = true)
+    @UriPath
+    @Metadata(required = true)
     private String path;
     @UriParam(label = "producer", defaultValue = "true")
     private boolean overwrite = true;
@@ -81,6 +92,17 @@ public class HdfsConfiguration {
     @UriParam
     private String owner;
 
+    @UriParam
+    private String kerberosNamedNodes;
+    private List<String> kerberosNamedNodeList;
+
+    @UriParam
+    private String kerberosConfigFileLocation;
+    @UriParam
+    private String kerberosUsername;
+    @UriParam
+    private String kerberosKeytabLocation;
+
     public HdfsConfiguration() {
     }
 
@@ -171,33 +193,63 @@ public class HdfsConfiguration {
 
     private List<HdfsProducer.SplitStrategy> getSplitStrategies(Map<String, Object> hdfsSettings) {
         List<HdfsProducer.SplitStrategy> strategies = new ArrayList<>();
-        for (Object obj : hdfsSettings.keySet()) {
-            String key = (String) obj;
-            if ("splitStrategy".equals(key)) {
-                String eit = (String) hdfsSettings.get(key);
-                if (eit != null) {
-                    String[] strstrategies = eit.split(",");
-                    for (String strstrategy : strstrategies) {
-                        String tokens[] = strstrategy.split(":");
-                        if (tokens.length != 2) {
-                            throw new IllegalArgumentException("Wrong Split Strategy " + key + "=" + eit);
-                        }
-                        HdfsProducer.SplitStrategyType sst = HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
-                        long ssv = Long.valueOf(tokens[1]);
-                        strategies.add(new HdfsProducer.SplitStrategy(sst, ssv));
-                    }
+
+        splitStrategy = getString(hdfsSettings, "splitStrategy", kerberosNamedNodes);
+
+        if (Objects.nonNull(splitStrategy)) {
+            String[] strstrategies = splitStrategy.split(",");
+            for (String strstrategy : strstrategies) {
+                String[] tokens = strstrategy.split(":");
+                if (tokens.length != 2) {
+                    throw new IllegalArgumentException("Wrong Split Strategy [splitStrategy" + "=" + splitStrategy + "]");
                 }
+                HdfsProducer.SplitStrategyType sst = HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
+                long ssv = Long.parseLong(tokens[1]);
+                strategies.add(new HdfsProducer.SplitStrategy(sst, ssv));
             }
         }
         return strategies;
     }
 
+    private List<String> getKerberosNamedNodeList(Map<String, Object> hdfsSettings) {
+        kerberosNamedNodes = getString(hdfsSettings, "kerberosNamedNodes", kerberosNamedNodes);
+        return Arrays.stream(kerberosNamedNodes.split(",")).distinct().collect(Collectors.toList());
+    }
+
+
+    Configuration getJAASConfiguration() {
+        Configuration auth = null;
+        try {
+            auth = Configuration.getConfiguration();
+            LOG.trace("Existing JAAS Configuration {}", auth);
+        } catch (SecurityException e) {
+            LOG.trace("Cannot load existing JAAS configuration", e);
+        }
+        return auth;
+    }
+
+    /**
+     * To use the given configuration for security with JAAS.
+     */
+    void setJAASConfiguration(Configuration auth) {
+        if (auth != null) {
+            LOG.trace("Restoring existing JAAS Configuration {}", auth);
+            try {
+                Configuration.setConfiguration(auth);
+            } catch (SecurityException e) {
+                LOG.trace("Cannot restore JAAS Configuration. This exception is ignored.", e);
+            }
+        } else {
+            LOG.trace("No JAAS Configuration to restore");
+        }
+    }
+
     public void checkConsumerOptions() {
     }
 
     public void checkProducerOptions() {
         if (isAppend()) {
-            if (getSplitStrategies().size() != 0) {
+            if (!getSplitStrategies().isEmpty()) {
                 throw new IllegalArgumentException("Split Strategies incompatible with append=true");
             }
             if (getFileType() != HdfsFileType.NORMAL_FILE) {
@@ -236,6 +288,11 @@ public class HdfsConfiguration {
         pattern = getString(hdfsSettings, "pattern", pattern);
         chunkSize = getInteger(hdfsSettings, "chunkSize", chunkSize);
         splitStrategies = getSplitStrategies(hdfsSettings);
+
+        kerberosNamedNodeList = getKerberosNamedNodeList(hdfsSettings);
+        kerberosConfigFileLocation = getString(hdfsSettings, "kerberosConfigFileLocation", kerberosConfigFileLocation);
+        kerberosUsername = getString(hdfsSettings, "kerberosUsername", kerberosUsername);
+        kerberosKeytabLocation = getString(hdfsSettings, "kerberosKeytabLocation", kerberosKeytabLocation);
     }
 
     public URI getUri() {
@@ -511,4 +568,57 @@ public class HdfsConfiguration {
     public void setOwner(String owner) {
         this.owner = owner;
     }
+
+    public String getKerberosNamedNodes() {
+        return kerberosNamedNodes;
+    }
+
+    /**
+     * A comma separated list of kerberos nodes (e.g. host01.example.com:8021,host02.example.com:8021,host03.example.com:8025)
+     */
+    public void setKerberosNamedNodes(String kerberosNamedNodes) {
+        this.kerberosNamedNodes = kerberosNamedNodes;
+    }
+
+    public List<String> getKerberosNamedNodeList() {
+        return kerberosNamedNodeList;
+    }
+
+    public String getKerberosConfigFileLocation() {
+        return kerberosConfigFileLocation;
+    }
+
+    /**
+     * The location of the kerb5.conf file (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html)
+     */
+    public void setKerberosConfigFileLocation(String kerberosConfigFileLocation) {
+        this.kerberosConfigFileLocation = kerberosConfigFileLocation;
+    }
+
+    public String getKerberosUsername() {
+        return kerberosUsername;
+    }
+
+    /**
+     * The username used to authenticate with the kerberos nodes
+     */
+    public void setKerberosUsername(String kerberosUsername) {
+        this.kerberosUsername = kerberosUsername;
+    }
+
+    public String getKerberosKeytabLocation() {
+        return kerberosKeytabLocation;
+    }
+
+    /**
+     * The location of the keytab file used to authenticate with the kerberos nodes
+     */
+    public void setKerberosKeytabLocation(String kerberosKeytabLocation) {
+        this.kerberosKeytabLocation = kerberosKeytabLocation;
+    }
+
+    public boolean isKerberosAuthentication() {
+        return Objects.nonNull(kerberosNamedNodes) && Objects.nonNull(kerberosConfigFileLocation) && Objects.nonNull(kerberosUsername) && Objects.nonNull(kerberosKeytabLocation)
+                && !kerberosNamedNodes.isEmpty() && !kerberosConfigFileLocation.isEmpty() && !kerberosUsername.isEmpty() && !kerberosKeytabLocation.isEmpty();
+    }
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 46752f9..6dbd1de 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -25,7 +26,6 @@ import javax.security.auth.login.Configuration;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
 import org.apache.camel.Processor;
-import org.apache.camel.support.DefaultMessage;
 import org.apache.camel.support.ScheduledPollConsumer;
 import org.apache.camel.util.IOHelper;
 import org.apache.commons.lang.StringUtils;
@@ -41,7 +41,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     private final StringBuilder hdfsPath;
     private final Processor processor;
     private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
-    private volatile HdfsInputStream istream;
+    private volatile HdfsInputStream inputStream;
 
     public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) {
         super(endpoint, processor);
@@ -77,7 +77,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         }
 
         // hadoop will cache the connection by default so its faster to get in the poll method
-        HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString());
+        HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString(), config);
 
         if (onStartup) {
             log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath);
@@ -92,11 +92,11 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
     @Override
     protected int poll() throws Exception {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
+        Configuration auth = config.getJAASConfiguration();
         try {
             return doPoll();
         } finally {
-            HdfsComponent.setJAASConfiguration(auth);
+            config.setJAASConfiguration(auth);
         }
     }
 
@@ -111,7 +111,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
         int numMessages = 0;
 
         HdfsInfo info = setupHdfs(false);
-        FileStatus fileStatuses[];
+        FileStatus[] fileStatuses;
         if (info.getFileSystem().isFile(info.getPath())) {
             fileStatuses = info.getFileSystem().globStatus(info.getPath());
         } else {
@@ -119,6 +119,8 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
             fileStatuses = info.getFileSystem().globStatus(pattern, new ExcludePathFilter());
         }
 
+        fileStatuses = Optional.ofNullable(fileStatuses).orElse(new FileStatus[0]);
+
         for (FileStatus status : fileStatuses) {
 
             if (normalFileIsDirectoryNoSuccessFile(status, info)) {
@@ -137,8 +139,8 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
 
             try {
                 this.rwlock.writeLock().lock();
-                this.istream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
-                if (!this.istream.isOpened()) {
+                this.inputStream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
+                if (!this.inputStream.isOpened()) {
                     if (log.isDebugEnabled()) {
                         log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath());
                     }
@@ -151,16 +153,15 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
             try {
                 Holder<Object> key = new Holder<>();
                 Holder<Object> value = new Holder<>();
-                while (this.istream.next(key, value) >= 0) {
+                while (this.inputStream.next(key, value) >= 0) {
                     Exchange exchange = this.getEndpoint().createExchange();
-                    Message message = new DefaultMessage(this.getEndpoint().getCamelContext());
+                    Message message = exchange.getIn();
                     String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/");
                     message.setHeader(Exchange.FILE_NAME, fileName);
                     if (key.value != null) {
                         message.setHeader(HdfsHeader.KEY.name(), key.value);
                     }
                     message.setBody(value.value);
-                    exchange.setIn(message);
 
                     log.debug("Processing file {}", fileName);
                     try {
@@ -177,7 +178,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer {
                     numMessages++;
                 }
             } finally {
-                IOHelper.close(istream, "input stream", log);
+                IOHelper.close(inputStream, "input stream", log);
             }
         }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
index 4661797a..9d8c79b 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
@@ -24,7 +24,7 @@ public enum HdfsFileSystemType {
             StringBuilder hpath = new StringBuilder();
             hpath.append("file://");
             hpath.append(config.getPath());
-            if (config.getSplitStrategies().size() > 0) {
+            if (!config.getSplitStrategies().isEmpty()) {
                 hpath.append('/');
             }
             return hpath;
@@ -40,7 +40,7 @@ public enum HdfsFileSystemType {
             hpath.append(':');
             hpath.append(config.getPort());
             hpath.append(config.getPath());
-            if (config.getSplitStrategies().size() > 0) {
+            if (!config.getSplitStrategies().isEmpty()) {
                 hpath.append('/');
             }
             return hpath;
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 63f5884..71ec5e6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -16,19 +16,6 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.ByteArrayOutputStream;
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.util.IOHelper;
@@ -54,9 +41,21 @@ import org.apache.hadoop.io.SequenceFile.Writer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.Map;
+
 public enum HdfsFileType {
 
     NORMAL_FILE {
@@ -74,11 +73,11 @@ public enum HdfsFileType {
         }
 
         @Override
-        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+        public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
             try {
-                ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsistr.getChunkSize());
-                byte buf[] = new byte[hdfsistr.getChunkSize()];
-                int bytesRead = ((InputStream) hdfsistr.getIn()).read(buf);
+                ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize());
+                byte[] buf = new byte[hdfsInputStream.getChunkSize()];
+                int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
                 if (bytesRead >= 0) {
                     bos.write(buf, 0, bytesRead);
                     key.value = null;
@@ -99,19 +98,13 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 if (!configuration.isAppend()) {
                     rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), configuration.isOverwrite(), configuration.getBufferSize(),
-                            configuration.getReplication(), configuration.getBlockSize(), new Progressable() {
-                                @Override
-                                public void progress() {
-                                }
+                            configuration.getReplication(), configuration.getBlockSize(), () -> {
                             });
                 } else {
-                    rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), new Progressable() {
-                        @Override
-                        public void progress() {
-                        }
+                    rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), () -> {
                     });
                 }
                 return rout;
@@ -125,7 +118,7 @@ public enum HdfsFileType {
             try {
                 Closeable rin;
                 if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
-                    HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                    HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                     rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
                 } else {
                     rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration));
@@ -158,7 +151,7 @@ public enum HdfsFileType {
                     outputDest.delete();
                 }
 
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 FileSystem fileSystem = hdfsInfo.getFileSystem();
                 FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf());
                 try {
@@ -220,17 +213,14 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 Class<?> keyWritableClass = configuration.getKeyType().getWritableClass();
                 Class<?> valueWritableClass = configuration.getValueType().getWritableClass();
                 rout = SequenceFile.createWriter(hdfsInfo.getConf(), Writer.file(hdfsInfo.getPath()), Writer.keyClass(keyWritableClass),
                         Writer.valueClass(valueWritableClass), Writer.bufferSize(configuration.getBufferSize()),
                         Writer.replication(configuration.getReplication()), Writer.blockSize(configuration.getBlockSize()),
                         Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
-                        Writer.progressable(new Progressable() {
-                            @Override
-                            public void progress() {
-                            }
+                        Writer.progressable(() -> {
                         }), Writer.metadata(new SequenceFile.Metadata()));
                 return rout;
             } catch (IOException ex) {
@@ -242,7 +232,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 rin = new SequenceFile.Reader(hdfsInfo.getConf(), Reader.file(hdfsInfo.getPath()));
                 return rin;
             } catch (IOException ex) {
@@ -267,9 +257,9 @@ public enum HdfsFileType {
         }
 
         @Override
-        public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) {
+        public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) {
             try {
-                MapFile.Reader reader = (MapFile.Reader) hdfsistr.getIn();
+                MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn();
                 Holder<Integer> keySize = new Holder<>();
                 WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
                 Holder<Integer> valueSize = new Holder<>();
@@ -291,15 +281,12 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
                 Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
                 rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass),
                     MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(new Progressable() {
-                        @Override
-                        public void progress() {
-                        }
+                    MapFile.Writer.progressable(() -> {
                     }));
                 return rout;
             } catch (IOException ex) {
@@ -311,7 +298,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -360,16 +347,13 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass();
                 Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
                 rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), org.apache.hadoop.io.MapFile.Writer.keyClass(keyWritableClass),
                         org.apache.hadoop.io.MapFile.Writer.valueClass(valueWritableClass),
                         org.apache.hadoop.io.MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()),
-                        org.apache.hadoop.io.MapFile.Writer.progressable(new Progressable() {
-                            @Override
-                            public void progress() {
-                            }
+                        org.apache.hadoop.io.MapFile.Writer.progressable(() -> {
                         }));
                 return rout;
             } catch (IOException ex) {
@@ -381,7 +365,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -425,13 +409,10 @@ public enum HdfsFileType {
         public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass();
                 rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
-                        configuration.getCompressionType(), new Progressable() {
-                            @Override
-                            public void progress() {
-                            }
+                        configuration.getCompressionType(), () -> {
                         });
                 return rout;
             } catch (IOException ex) {
@@ -443,7 +424,7 @@ public enum HdfsFileType {
         public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) {
             try {
                 Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath);
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration);
                 rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf());
                 return rin;
             } catch (IOException ex) {
@@ -506,7 +487,7 @@ public enum HdfsFileType {
 
     public abstract long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter);
 
-    public abstract long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value);
+    public abstract long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value);
 
     public abstract Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration);
 
@@ -515,7 +496,7 @@ public enum HdfsFileType {
     public static long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException {
         long numBytes = 0;
         PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
-        byte buf[] = new byte[buffSize];
+        byte[] buf = new byte[buffSize];
         try {
             int bytesRead = in.read(buf);
             while (bytesRead >= 0) {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
index 388c936..a3baa16 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java
@@ -16,29 +16,30 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.IOException;
-import java.net.URI;
-
+import org.apache.camel.component.hdfs.kerberos.HdfsKerberosConfigurationFactory;
+import org.apache.camel.component.hdfs.kerberos.KerberosConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
 public final class HdfsInfo {
 
-    private Configuration conf;
+    private Configuration configuration;
     private FileSystem fileSystem;
     private Path path;
 
-    HdfsInfo(String hdfsPath) throws IOException {
-        this.conf = new Configuration();
-        // this will connect to the hadoop hdfs file system, and in case of no connection
-        // then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
-        this.fileSystem = FileSystem.get(URI.create(hdfsPath), conf);
+    HdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+        this.configuration = newConfiguration(endpointConfig);
+        this.fileSystem = newFileSystem(this.configuration, hdfsPath, endpointConfig);
         this.path = new Path(hdfsPath);
     }
 
     public Configuration getConf() {
-        return conf;
+        return configuration;
     }
 
     public FileSystem getFileSystem() {
@@ -48,4 +49,31 @@ public final class HdfsInfo {
     public Path getPath() {
         return path;
     }
+
+    private Configuration newConfiguration(HdfsConfiguration endpointConfig) throws IOException {
+        if (endpointConfig.isKerberosAuthentication()) {
+            List<String> namedNodes = endpointConfig.getKerberosNamedNodeList();
+            String kerberosConfigFileLocation = endpointConfig.getKerberosConfigFileLocation();
+            return new KerberosConfiguration(namedNodes, kerberosConfigFileLocation, endpointConfig.getReplication());
+
+        } else {
+            return new Configuration();
+
+        }
+    }
+
+    /**
+     * this will connect to the hadoop hdfs file system, and in case of no connection
+     * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes
+     */
+    private FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException {
+        if (endpointConfig.isKerberosAuthentication()) {
+            String userName = endpointConfig.getKerberosUsername();
+            String keytabLocation = endpointConfig.getKerberosKeytabLocation();
+            ((KerberosConfiguration)configuration).loginWithKeytab(userName, keytabLocation);
+        }
+
+        return FileSystem.get(URI.create(hdfsPath), configuration);
+    }
+
 }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
index ea07402..aec5348 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java
@@ -25,13 +25,13 @@ public final class HdfsInfoFactory {
     private HdfsInfoFactory() {
     }
 
-    public static HdfsInfo newHdfsInfo(String hdfsPath) throws IOException {
+    public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
+        Configuration auth = configuration.getJAASConfiguration();
         try {
-            return new HdfsInfo(hdfsPath);
+            return new HdfsInfo(hdfsPath, configuration);
         } finally {
-            HdfsComponent.setJAASConfiguration(auth);
+            configuration.setJAASConfiguration(auth);
         }
     }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 5585c55..58816d6 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -35,6 +35,8 @@ public class HdfsInputStream implements Closeable {
     private final AtomicLong numOfReadBytes = new AtomicLong(0L);
     private final AtomicLong numOfReadMessages = new AtomicLong(0L);
 
+    private HdfsConfiguration config;
+
     protected HdfsInputStream() {
     }
 
@@ -45,13 +47,14 @@ public class HdfsInputStream implements Closeable {
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
         ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
-        HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath);
+        HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration);
         if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) {
             ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration);
             ret.opened = true;
         } else {
             ret.opened = false;
         }
+        ret.config = configuration;
         return ret;
     }
 
@@ -59,7 +62,7 @@ public class HdfsInputStream implements Closeable {
     public final void close() throws IOException {
         if (opened) {
             IOUtils.closeStream(in);
-            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath);
+            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath, config);
             info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath));
             opened = false;
         }
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
index 5a64dd1..6e623e3 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java
@@ -46,7 +46,7 @@ public class HdfsOutputStream implements Closeable {
         HdfsOutputStream ret = new HdfsOutputStream();
         ret.fileType = configuration.getFileType();
         ret.actualPath = hdfsPath;
-        ret.info = new HdfsInfo(ret.actualPath);
+        ret.info = new HdfsInfo(ret.actualPath, configuration);
 
         ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix();
         if (configuration.isWantAppend() || configuration.isAppend()) {
@@ -54,7 +54,7 @@ public class HdfsOutputStream implements Closeable {
                 configuration.setAppend(false);
             } else {
                 configuration.setAppend(true);
-                ret.info = new HdfsInfo(ret.suffixedPath);
+                ret.info = new HdfsInfo(ret.suffixedPath, configuration);
                 ret.info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath));
             }
         } else {
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index d0c5350..999482e 100644
--- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -29,9 +29,13 @@ import org.apache.camel.Expression;
 import org.apache.camel.support.DefaultProducer;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.StringHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsProducer extends DefaultProducer {
 
+    private static final Logger LOG = LoggerFactory.getLogger(HdfsProducer.class);
+
     private final HdfsConfiguration config;
     private final StringBuilder hdfsPath;
     private final AtomicBoolean idle = new AtomicBoolean(false);
@@ -93,9 +97,9 @@ public class HdfsProducer extends DefaultProducer {
     }
 
     @Override
-    protected void doStart() throws Exception {
+    protected void doStart() {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
+        Configuration auth = config.getJAASConfiguration();
         try {
             super.doStart();
 
@@ -116,8 +120,12 @@ public class HdfsProducer extends DefaultProducer {
                 log.debug("Creating IdleCheck task scheduled to run every {} millis", config.getCheckIdleInterval());
                 scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS);
             }
+        } catch (Exception e) {
+            LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage());
+            LOG.trace("", e);
+            throw new RuntimeException(e);
         } finally {
-            HdfsComponent.setJAASConfiguration(auth);
+            config.setJAASConfiguration(auth);
         }
     }
 
@@ -169,11 +177,11 @@ public class HdfsProducer extends DefaultProducer {
     @Override
     public void process(Exchange exchange) throws Exception {
         // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards
-        Configuration auth = HdfsComponent.getJAASConfiguration();
+        Configuration auth = config.getJAASConfiguration();
         try {
             doProcess(exchange);
         } finally {
-            HdfsComponent.setJAASConfiguration(auth);
+            config.setJAASConfiguration(auth);
         }
     }
 
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactory.java
new file mode 100644
index 0000000..45d63e9
--- /dev/null
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactory.java
@@ -0,0 +1,30 @@
+package org.apache.camel.component.hdfs.kerberos;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+
+import static java.lang.String.format;
+
+public class HdfsKerberosConfigurationFactory {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(HdfsKerberosConfigurationFactory.class);
+
+    private static final String KERBEROS_5_SYS_ENV = "java.security.krb5.conf";
+
+    public static void setKerberosConfigFile(String kerberosConfigFileLocation) throws FileNotFoundException {
+        if (!new File(kerberosConfigFileLocation).exists()) {
+            throw new FileNotFoundException(format("KeyTab file [%s] could not be found.", kerberosConfigFileLocation));
+        }
+
+        String krb5Conf = System.getProperty(KERBEROS_5_SYS_ENV);
+        if (krb5Conf == null || !krb5Conf.isEmpty()) {
+            System.setProperty(KERBEROS_5_SYS_ENV, kerberosConfigFileLocation);
+        } else if (!krb5Conf.equalsIgnoreCase(kerberosConfigFileLocation)) {
+            LOGGER.warn("{} was already configured with: {}", KERBEROS_5_SYS_ENV, krb5Conf);
+        }
+    }
+
+}
diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
new file mode 100644
index 0000000..0a1e914
--- /dev/null
+++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
@@ -0,0 +1,95 @@
+package org.apache.camel.component.hdfs.kerberos;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static java.lang.String.format;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
+
+public class KerberosConfiguration extends Configuration {
+
+    private static final String HFDS_NAMED_SERVICE = "hfdsNamedService";
+
+    private static final String AUTHENTICATION_MODE = "hadoop.security.authentication";
+    private static final String HFDS_FS = "fs.defaultFS";
+
+    /**
+     * Add all the kerberos specific settings needed for this authentication mode
+     * Generates the correct HA configuration (normally read from xml) based on the namedNodes:
+     * All named nodes have to be qualified: configuration.set("dfs.ha.namenodes.hfdsNamedService","namenode1,namenode2");
+     * For each named node the following entries is added
+     * <p>
+     * configuration.set("dfs.namenode.rpc-address.hfdsNamedService.namenode1", "namenode1:1234");
+     * <p>
+     * Finally the proxy provider has to be specified:
+     * <p>
+     * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
+     * <p>
+     *
+     * @param namedNodes                 - All named nodes from the hadoop cluster
+     * @param kerberosConfigFileLocation - The location of the kerberos config file (on the server)
+     * @param replicationFactor          - dfs replication factor
+     */
+    public KerberosConfiguration(List<String> namedNodes,
+                                 String kerberosConfigFileLocation,
+                                 int replicationFactor) throws IOException {
+
+        HdfsKerberosConfigurationFactory.setKerberosConfigFile(kerberosConfigFileLocation);
+        setupHdfsConfiguration(namedNodes, replicationFactor);
+    }
+
+    private void setupHdfsConfiguration(List<String> namedNodes, int replicationFactor) {
+        this.set(AUTHENTICATION_MODE, "kerberos");
+
+        this.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(replicationFactor));
+        this.set(DFSConfigKeys.DFS_NAMESERVICES, HFDS_NAMED_SERVICE);
+        this.set(
+                DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, HFDS_NAMED_SERVICE),
+                nodeToString(namedNodes.stream().map(this::nodeToString).collect(Collectors.joining(",")))
+        );
+
+        namedNodes.forEach(nodeName ->
+                this.set(
+                        DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, HFDS_NAMED_SERVICE, nodeToString(nodeName)),
+                        nodeName)
+        );
+
+        this.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + HFDS_NAMED_SERVICE, ConfiguredFailoverProxyProvider.class.getName());
+
+        this.set(HFDS_FS, "hdfs://" + HFDS_NAMED_SERVICE);
+    }
+
+    /**
+     * In order to connect to a hadoop cluster using Kerberos you need to add your own filesystem to the cache of the FileSystem component.
+     * This is done by setting the uri that you use in your camel route as the URI that is used to setup the connection.
+     * The URI is used as key when adding it to the cache (default functionality of the static FileSystem.get(URI, Configuration) method).
+     *
+     * @param username           - Principal used to connect to the cluster
+     * @param keyTabFileLocation - KeyTab file location (must be on the server)
+     * @throws IOException - In case of error
+     */
+    public void loginWithKeytab(String username, String keyTabFileLocation) throws IOException {
+        if (!new File(keyTabFileLocation).exists()) {
+            throw new FileNotFoundException(format("KeyTab file [%s] could not be found.", keyTabFileLocation));
+        }
+        // we need to log in otherwise you cannot connect to the filesystem later on
+        UserGroupInformation.setConfiguration(this);
+        UserGroupInformation.loginUserFromKeytab(username, keyTabFileLocation);
+    }
+
+    private String nodeToString(String nodeName) {
+        return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_");
+    }
+
+}
diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactoryTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactoryTest.java
new file mode 100644
index 0000000..bf903ea
--- /dev/null
+++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/HdfsKerberosConfigurationFactoryTest.java
@@ -0,0 +1,25 @@
+package org.apache.camel.component.hdfs.kerberos;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+public class HdfsKerberosConfigurationFactoryTest {
+
+    @Test
+    public void setupExistingKerberosConfigFile() throws IOException {
+        // given
+        String kerberosConfigFileLocation = null;
+
+        // when
+        HdfsKerberosConfigurationFactory.setKerberosConfigFile(kerberosConfigFileLocation);
+
+        // then
+
+    }
+
+
+
+}
\ No newline at end of file
diff --git a/components/camel-hdfs/src/test/resources/kerberos/test-kerb5.conf b/components/camel-hdfs/src/test/resources/kerberos/test-kerb5.conf
new file mode 100644
index 0000000..be2edb3
--- /dev/null
+++ b/components/camel-hdfs/src/test/resources/kerberos/test-kerb5.conf
@@ -0,0 +1,12 @@
+[libdefaults]
+ default_realm = EXAMPLE.COM
+
+[realms]
+ EXAMPLE.COM = {
+  kdc = srv11.example.com
+  kdc = srv12.example.com
+  admin_server = srv21.example.com
+  master_kdc = srv31.example.com
+  default_domain = EXAMPLE.COM
+  auth_to_local =  DEFAULT
+}
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
index 676ad7a..9b018ee 100644
--- a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/HdfsEndpointBuilderFactory.java
@@ -132,6 +132,57 @@ public interface HdfsEndpointBuilderFactory {
             return this;
         }
         /**
+         * The location of the kerb5.conf file
+         * (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointConsumerBuilder kerberosConfigFileLocation(
+                String kerberosConfigFileLocation) {
+            doSetProperty("kerberosConfigFileLocation", kerberosConfigFileLocation);
+            return this;
+        }
+        /**
+         * The location of the keytab file used to authenticate with the
+         * kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointConsumerBuilder kerberosKeytabLocation(
+                String kerberosKeytabLocation) {
+            doSetProperty("kerberosKeytabLocation", kerberosKeytabLocation);
+            return this;
+        }
+        /**
+         * A comma separated list of kerberos nodes (e.g.
+         * host01.example.com:8021,host02.example.com:8021,host03.example.com:8025).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointConsumerBuilder kerberosNamedNodes(
+                String kerberosNamedNodes) {
+            doSetProperty("kerberosNamedNodes", kerberosNamedNodes);
+            return this;
+        }
+        /**
+         * The username used to authenticate with the kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointConsumerBuilder kerberosUsername(
+                String kerberosUsername) {
+            doSetProperty("kerberosUsername", kerberosUsername);
+            return this;
+        }
+        /**
          * The type for the key in case of sequence or map files.
          * 
          * The option is a:
@@ -1065,6 +1116,57 @@ public interface HdfsEndpointBuilderFactory {
             return this;
         }
         /**
+         * The location of the kerb5.conf file
+         * (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointProducerBuilder kerberosConfigFileLocation(
+                String kerberosConfigFileLocation) {
+            doSetProperty("kerberosConfigFileLocation", kerberosConfigFileLocation);
+            return this;
+        }
+        /**
+         * The location of the keytab file used to authenticate with the
+         * kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointProducerBuilder kerberosKeytabLocation(
+                String kerberosKeytabLocation) {
+            doSetProperty("kerberosKeytabLocation", kerberosKeytabLocation);
+            return this;
+        }
+        /**
+         * A comma separated list of kerberos nodes (e.g.
+         * host01.example.com:8021,host02.example.com:8021,host03.example.com:8025).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointProducerBuilder kerberosNamedNodes(
+                String kerberosNamedNodes) {
+            doSetProperty("kerberosNamedNodes", kerberosNamedNodes);
+            return this;
+        }
+        /**
+         * The username used to authenticate with the kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointProducerBuilder kerberosUsername(
+                String kerberosUsername) {
+            doSetProperty("kerberosUsername", kerberosUsername);
+            return this;
+        }
+        /**
          * The type for the key in case of sequence or map files.
          * 
          * The option is a:
@@ -1590,6 +1692,55 @@ public interface HdfsEndpointBuilderFactory {
             return this;
         }
         /**
+         * The location of the kerb5.conf file
+         * (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointBuilder kerberosConfigFileLocation(
+                String kerberosConfigFileLocation) {
+            doSetProperty("kerberosConfigFileLocation", kerberosConfigFileLocation);
+            return this;
+        }
+        /**
+         * The location of the keytab file used to authenticate with the
+         * kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointBuilder kerberosKeytabLocation(
+                String kerberosKeytabLocation) {
+            doSetProperty("kerberosKeytabLocation", kerberosKeytabLocation);
+            return this;
+        }
+        /**
+         * A comma separated list of kerberos nodes (e.g.
+         * host01.example.com:8021,host02.example.com:8021,host03.example.com:8025).
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointBuilder kerberosNamedNodes(String kerberosNamedNodes) {
+            doSetProperty("kerberosNamedNodes", kerberosNamedNodes);
+            return this;
+        }
+        /**
+         * The username used to authenticate with the kerberos nodes.
+         * 
+         * The option is a: <code>java.lang.String</code> type.
+         * 
+         * Group: common
+         */
+        default HdfsEndpointBuilder kerberosUsername(String kerberosUsername) {
+            doSetProperty("kerberosUsername", kerberosUsername);
+            return this;
+        }
+        /**
          * The type for the key in case of sequence or map files.
          * 
          * The option is a:
diff --git a/platforms/spring-boot/components-starter/camel-hdfs-starter/src/main/java/org/apache/camel/component/hdfs/springboot/HdfsComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-hdfs-starter/src/main/java/org/apache/camel/component/hdfs/springboot/HdfsComponentConfiguration.java
index ea88324..deb56f5 100644
--- a/platforms/spring-boot/components-starter/camel-hdfs-starter/src/main/java/org/apache/camel/component/hdfs/springboot/HdfsComponentConfiguration.java
+++ b/platforms/spring-boot/components-starter/camel-hdfs-starter/src/main/java/org/apache/camel/component/hdfs/springboot/HdfsComponentConfiguration.java
@@ -37,24 +37,11 @@ public class HdfsComponentConfiguration
      */
     private Boolean enabled;
     /**
-     * To use the given configuration for security with JAAS. The option is a
-     * javax.security.auth.login.Configuration type.
-     */
-    private String jAASConfiguration;
-    /**
      * Whether the component should use basic property binding (Camel 2.x) or
      * the newer property binding with additional capabilities
      */
     private Boolean basicPropertyBinding = false;
 
-    public String getJAASConfiguration() {
-        return jAASConfiguration;
-    }
-
-    public void setJAASConfiguration(String jAASConfiguration) {
-        this.jAASConfiguration = jAASConfiguration;
-    }
-
     public Boolean getBasicPropertyBinding() {
         return basicPropertyBinding;
     }