You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/19 16:14:12 UTC

[ambari-logsearch] 28/28: AMBARI-24833. HDFS client kerberos support + small fixes (#27)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git

commit d7499927f4dc2349a45261a23b0b955b173250d8
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 19 14:19:57 2018 +0100

    AMBARI-24833. HDFS client kerberos support + small fixes (#27)
    
    * AMBARI-24833. HDFS client kerberos support + small fixes
    
    * AMBARI-24833. Fix principal description
---
 .../logfeeder/common/LogFeederConstants.java       |  2 +
 .../ambari/logfeeder/conf/LogFeederProps.java      | 27 ++------
 ...HdfsOutputConfig.java => HdfsOutputConfig.java} | 65 +++++++++++++++++--
 .../impl/AbstractInputConfigHandler.java           |  2 +-
 .../impl/CloudStorageInputConfigHandler.java       |  2 +-
 .../operations/impl/DefaultInputConfigHandler.java |  2 +-
 .../logfeeder/output/OutputLineEnricher.java       |  2 +-
 .../cloud/upload/ExternalHDFSUploadClient.java     | 73 ----------------------
 .../output/cloud/upload/HDFSUploadClient.java      | 55 ++++++++++++----
 .../output/cloud/upload/UploadClientFactory.java   |  4 +-
 10 files changed, 114 insertions(+), 120 deletions(-)

diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f9ef32d..a15ac74 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -132,6 +132,8 @@ public class LogFeederConstants {
   public static final String HDFS_PORT = "logfeeder.hdfs.port";
   public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
   public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+  public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab";
+  public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal";
 
   public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
   public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index f2eb6c7..b6ab4c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
 import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
 import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -53,7 +53,7 @@ public class LogFeederProps implements LogFeederProperties {
   private S3OutputConfig s3OutputConfig;
 
   @Inject
-  private ExternalHdfsOutputConfig hdfsOutputConfig;
+  private HdfsOutputConfig hdfsOutputConfig;
 
   private Properties properties;
 
@@ -258,7 +258,7 @@ public class LogFeederProps implements LogFeederProperties {
     defaultValue = "false",
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
-  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}")
   private boolean useCloudHdfsClient;
 
   @LogSearchPropertyDescription(
@@ -281,15 +281,6 @@ public class LogFeederProps implements LogFeederProperties {
   private String cloudBasePath;
 
   @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_USER,
-    description = "Overrides HADOOP_USER_NAME variable at runtime",
-    examples = {"hdfs"},
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
-  private String logfeederHdfsUser;
-
-  @LogSearchPropertyDescription(
     name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
     description = "Use filters for inputs (with filters the output format will be JSON)",
     examples = {"true"},
@@ -460,7 +451,7 @@ public class LogFeederProps implements LogFeederProperties {
     this.cloudStorageMode = cloudStorageMode;
   }
 
-  public ExternalHdfsOutputConfig getHdfsOutputConfig() {
+  public HdfsOutputConfig getHdfsOutputConfig() {
     return hdfsOutputConfig;
   }
 
@@ -480,7 +471,7 @@ public class LogFeederProps implements LogFeederProperties {
     this.rolloverConfig = rolloverConfig;
   }
 
-  public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
+  public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
     this.hdfsOutputConfig = hdfsOutputConfig;
   }
 
@@ -512,14 +503,6 @@ public class LogFeederProps implements LogFeederProperties {
     return useCloudHdfsClient;
   }
 
-  public String getLogfeederHdfsUser() {
-    return logfeederHdfsUser;
-  }
-
-  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
-    this.logfeederHdfsUser = logfeederHdfsUser;
-  }
-
   public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
     this.useCloudHdfsClient = useCloudHdfsClient;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
similarity index 59%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
index fbbf869..312f2f0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -24,7 +24,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
-public class ExternalHdfsOutputConfig {
+public class HdfsOutputConfig {
 
   @LogSearchPropertyDescription(
     name = LogFeederConstants.HDFS_HOST,
@@ -55,6 +55,15 @@ public class ExternalHdfsOutputConfig {
   private String hdfsFilePermissions;
 
   @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_USER,
+    description = "Overrides HADOOP_USER_NAME variable at runtime",
+    examples = {"hdfs"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+  private String logfeederHdfsUser;
+
+  @LogSearchPropertyDescription(
     name = LogFeederConstants.HDFS_KERBEROS,
     description = "Enable kerberos support for HDFS",
     examples = {"true"},
@@ -62,7 +71,27 @@ public class ExternalHdfsOutputConfig {
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
   @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
-  private boolean secure;
+  private boolean hdfsKerberos;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_KEYTAB,
+    description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ",
+    examples = {"/etc/security/keytabs/mykeytab.keytab"},
+    defaultValue = "/etc/security/keytabs/logfeeder.service.keytab",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}")
+  private String keytab;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL,
+    description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ",
+    examples = {"mylogfeeder/myhost1@EXAMPLE.COM"},
+    defaultValue = "logfeeder/_HOST",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}")
+  private String principal;
 
   public String getHdfsHost() {
     return hdfsHost;
@@ -88,11 +117,35 @@ public class ExternalHdfsOutputConfig {
     this.hdfsFilePermissions = hdfsFilePermissions;
   }
 
-  public boolean isSecure() {
-    return secure;
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+  public String getLogfeederHdfsUser() {
+    return logfeederHdfsUser;
+  }
+
+  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
+    this.logfeederHdfsUser = logfeederHdfsUser;
+  }
+
+  public boolean isHdfsKerberos() {
+    return hdfsKerberos;
   }
 
-  public void setSecure(boolean secure) {
-    this.secure = secure;
+  public void setHdfsKerberos(boolean hdfsKerberos) {
+    this.hdfsKerberos = hdfsKerberos;
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 31bfd0d..d383ed1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -47,7 +47,7 @@ public abstract class AbstractInputConfigHandler implements InputConfigHandler {
     for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
       for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
         if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
+          logger.warn("Filter descriptor is empty. Skipping...");
           continue;
         }
         if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index ac10b2d..c2e73b7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -52,7 +52,7 @@ public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler {
     final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
       LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index dd0fe3e..4677461 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -74,7 +74,7 @@ public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
   private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
index bd9e3df..ff0805d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -46,7 +46,7 @@ public class OutputLineEnricher {
     Input input = inputMarker.getInput();
     // Update the block with the context fields
     for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+      if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) {
         jsonObj.put(entry.getKey(), entry.getValue());
       }
     }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
deleted file mode 100644
index a23a715..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud.upload;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * HDFS (on-prem) specific uploader client that can work with an external HDFS.
- */
-public class ExternalHDFSUploadClient implements UploadClient {
-
-  private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
-
-  private final ExternalHdfsOutputConfig hdfsOutputConfig;
-  private final FsPermission fsPermission;
-  private FileSystem fs;
-
-  public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
-    this.hdfsOutputConfig = hdfsOutputConfig;
-    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
-  }
-
-  @Override
-  public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize external HDFS client ...");
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
-    }
-    this.fs = LogFeederHDFSUtil.buildFileSystem(
-      hdfsOutputConfig.getHdfsHost(),
-      String.valueOf(hdfsOutputConfig.getHdfsPort()));
-    if (logFeederProps.getHdfsOutputConfig().isSecure()) {
-      logger.info("Kerberos is enabled for external HDFS.");
-      Configuration conf = fs.getConf();
-      conf.set("hadoop.security.authentication", "kerberos");
-    }
-  }
-
-  @Override
-  public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
-  }
-
-  @Override
-  public void close() {
-    LogFeederHDFSUtil.closeFileSystem(fs);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index c2a8497..421c4c5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,15 +19,17 @@
 package org.apache.ambari.logfeeder.output.cloud.upload;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
-
 /**
  * HDFS client that uses core-site.xml file from the classpath to load the configuration.
  * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -35,37 +37,64 @@ import java.io.IOException;
 public class HDFSUploadClient implements UploadClient {
 
   private static final String FS_DEFAULT_FS = "fs.defaultFS";
+  private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
 
   private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
 
+  private final boolean externalHdfs;
+  private final HdfsOutputConfig hdfsOutputConfig;
+  private final FsPermission fsPermission;
   private FileSystem fs;
 
+  public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
+    this.hdfsOutputConfig = hdfsOutputConfig;
+    this.externalHdfs = externalHdfs;
+    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+  }
+
   @Override
   public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
-    Configuration configuration = new Configuration();
+    final Configuration configuration;
+    if (externalHdfs) {
+      configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs");
+      logger.info("Using external HDFS client as core-site.xml is not located on the classpath.");
+    } else {
+      configuration = new Configuration();
+      logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+    }
     if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
       configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
     }
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
+    if (hdfsOutputConfig.isHdfsKerberos()) {
+      logger.info("Kerberos is enabled for HDFS.");
+      configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+      final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName);
+      UserGroupInformation.setConfiguration(configuration);
+      try {
+        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab());
+        UserGroupInformation.setLoginUser(ugi);
+      } catch (Exception e) {
+        logger.error("Error during kerberos login", e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) {
+        logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser());
+        System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser());
+      }
     }
+    logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
     this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
   }
 
   @Override
   public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
+    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     LogFeederHDFSUtil.closeFileSystem(fs);
   }
 
-  private boolean isHadoopFileSystem(Configuration conf) {
-    return conf.get(FS_DEFAULT_FS).contains("hdfs://");
-  }
-
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index bea2943..27d69c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -43,11 +43,11 @@ public class UploadClientFactory {
     if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
       logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
       logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
-      return new HDFSUploadClient();
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false);
     }
     else if (CloudStorageDestination.HDFS.equals(destType)) {
       logger.info("External HDFS output will be used.");
-      return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true);
     } else if (CloudStorageDestination.S3.equals(destType)) {
       if (useHdfsClient) {
         logger.info("S3 cloud output will be used with HDFS client.");