You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by GitBox <gi...@apache.org> on 2018/11/19 13:19:59 UTC

[GitHub] oleewere closed pull request #27: AMBARI-24833. HDFS client kerberos support + small fixes

oleewere closed pull request #27: AMBARI-24833. HDFS client kerberos support + small fixes
URL: https://github.com/apache/ambari-logsearch/pull/27
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f9ef32d688..a15ac7468a 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -132,6 +132,8 @@
   public static final String HDFS_PORT = "logfeeder.hdfs.port";
   public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
   public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+  public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab";
+  public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal";
 
   public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
   public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index f2eb6c741e..b6ab4c7342 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
 package org.apache.ambari.logfeeder.conf;
 
 import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
 import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
 import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -53,7 +53,7 @@
   private S3OutputConfig s3OutputConfig;
 
   @Inject
-  private ExternalHdfsOutputConfig hdfsOutputConfig;
+  private HdfsOutputConfig hdfsOutputConfig;
 
   private Properties properties;
 
@@ -258,7 +258,7 @@
     defaultValue = "false",
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
-  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}")
   private boolean useCloudHdfsClient;
 
   @LogSearchPropertyDescription(
@@ -280,15 +280,6 @@
   @Value("${" + LogFeederConstants.CLOUD_STORAGE_BASE_PATH + ":}")
   private String cloudBasePath;
 
-  @LogSearchPropertyDescription(
-    name = LogFeederConstants.HDFS_USER,
-    description = "Overrides HADOOP_USER_NAME variable at runtime",
-    examples = {"hdfs"},
-    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
-  )
-  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
-  private String logfeederHdfsUser;
-
   @LogSearchPropertyDescription(
     name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
     description = "Use filters for inputs (with filters the output format will be JSON)",
@@ -460,7 +451,7 @@ public void setCloudStorageMode(LogFeederMode cloudStorageMode) {
     this.cloudStorageMode = cloudStorageMode;
   }
 
-  public ExternalHdfsOutputConfig getHdfsOutputConfig() {
+  public HdfsOutputConfig getHdfsOutputConfig() {
     return hdfsOutputConfig;
   }
 
@@ -480,7 +471,7 @@ public void setRolloverConfig(RolloverConfig rolloverConfig) {
     this.rolloverConfig = rolloverConfig;
   }
 
-  public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
+  public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
     this.hdfsOutputConfig = hdfsOutputConfig;
   }
 
@@ -512,14 +503,6 @@ public boolean isUseCloudHdfsClient() {
     return useCloudHdfsClient;
   }
 
-  public String getLogfeederHdfsUser() {
-    return logfeederHdfsUser;
-  }
-
-  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
-    this.logfeederHdfsUser = logfeederHdfsUser;
-  }
-
   public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
     this.useCloudHdfsClient = useCloudHdfsClient;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
similarity index 59%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
index fbbf869143..312f2f042f 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -24,7 +24,7 @@
 import org.springframework.context.annotation.Configuration;
 
 @Configuration
-public class ExternalHdfsOutputConfig {
+public class HdfsOutputConfig {
 
   @LogSearchPropertyDescription(
     name = LogFeederConstants.HDFS_HOST,
@@ -54,6 +54,15 @@
   @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
   private String hdfsFilePermissions;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_USER,
+    description = "Overrides HADOOP_USER_NAME variable at runtime",
+    examples = {"hdfs"},
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+  private String logfeederHdfsUser;
+
   @LogSearchPropertyDescription(
     name = LogFeederConstants.HDFS_KERBEROS,
     description = "Enable kerberos support for HDFS",
@@ -62,7 +71,27 @@
     sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
   )
   @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
-  private boolean secure;
+  private boolean hdfsKerberos;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_KEYTAB,
+    description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ",
+    examples = {"/etc/security/keytabs/mykeytab.keytab"},
+    defaultValue = "/etc/security/keytabs/logfeeder.service.keytab",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}")
+  private String keytab;
+
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL,
+    description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ",
+    examples = {"mylogfeeder/myhost1@EXAMPLE.COM"},
+    defaultValue = "logfeeder/_HOST",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}")
+  private String principal;
 
   public String getHdfsHost() {
     return hdfsHost;
@@ -88,11 +117,35 @@ public void setHdfsFilePermissions(String hdfsFilePermissions) {
     this.hdfsFilePermissions = hdfsFilePermissions;
   }
 
-  public boolean isSecure() {
-    return secure;
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+  public String getPrincipal() {
+    return principal;
+  }
+
+  public void setPrincipal(String principal) {
+    this.principal = principal;
+  }
+
+  public String getLogfeederHdfsUser() {
+    return logfeederHdfsUser;
+  }
+
+  public void setLogfeederHdfsUser(String logfeederHdfsUser) {
+    this.logfeederHdfsUser = logfeederHdfsUser;
+  }
+
+  public boolean isHdfsKerberos() {
+    return hdfsKerberos;
   }
 
-  public void setSecure(boolean secure) {
-    this.secure = secure;
+  public void setHdfsKerberos(boolean hdfsKerberos) {
+    this.hdfsKerberos = hdfsKerberos;
   }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 31bfd0d24b..d383ed1439 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -47,7 +47,7 @@ protected void loadFilters(String serviceName, InputConfigHolder inputConfigHold
     for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
       for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
         if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
+          logger.warn("Filter descriptor is empty. Skipping...");
           continue;
         }
         if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index ac10b2d667..c2e73b7fe6 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -52,7 +52,7 @@ public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder,
     final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
       LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index dd0fe3e23e..4677461595 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -74,7 +74,7 @@ public void assignInputsToOutputs(String serviceName, InputConfigHolder inputCon
   private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
+        logger.warn("Input descriptor is empty. Skipping...");
         continue;
       }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
index bd9e3df213..ff0805dad9 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -46,7 +46,7 @@ public void enrichFields(final Map<String, Object> jsonObj, final InputMarker in
     Input input = inputMarker.getInput();
     // Update the block with the context fields
     for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+      if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) {
         jsonObj.put(entry.getKey(), entry.getValue());
       }
     }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
deleted file mode 100644
index a23a7157f9..0000000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud.upload;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * HDFS (on-prem) specific uploader client that can work with an external HDFS.
- */
-public class ExternalHDFSUploadClient implements UploadClient {
-
-  private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
-
-  private final ExternalHdfsOutputConfig hdfsOutputConfig;
-  private final FsPermission fsPermission;
-  private FileSystem fs;
-
-  public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
-    this.hdfsOutputConfig = hdfsOutputConfig;
-    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
-  }
-
-  @Override
-  public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize external HDFS client ...");
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
-    }
-    this.fs = LogFeederHDFSUtil.buildFileSystem(
-      hdfsOutputConfig.getHdfsHost(),
-      String.valueOf(hdfsOutputConfig.getHdfsPort()));
-    if (logFeederProps.getHdfsOutputConfig().isSecure()) {
-      logger.info("Kerberos is enabled for external HDFS.");
-      Configuration conf = fs.getConf();
-      conf.set("hadoop.security.authentication", "kerberos");
-    }
-  }
-
-  @Override
-  public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
-  }
-
-  @Override
-  public void close() {
-    LogFeederHDFSUtil.closeFileSystem(fs);
-  }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index c2a8497b0a..421c4c5cb7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,15 +19,17 @@
 package org.apache.ambari.logfeeder.output.cloud.upload;
 
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
 import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
-import java.io.IOException;
-
 /**
  * HDFS client that uses core-site.xml file from the classpath to load the configuration.
  * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -35,37 +37,64 @@
 public class HDFSUploadClient implements UploadClient {
 
   private static final String FS_DEFAULT_FS = "fs.defaultFS";
+  private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
 
   private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
 
+  private final boolean externalHdfs;
+  private final HdfsOutputConfig hdfsOutputConfig;
+  private final FsPermission fsPermission;
   private FileSystem fs;
 
+  public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
+    this.hdfsOutputConfig = hdfsOutputConfig;
+    this.externalHdfs = externalHdfs;
+    this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+  }
+
   @Override
   public void init(LogFeederProps logFeederProps) {
-    logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
-    Configuration configuration = new Configuration();
+    final Configuration configuration;
+    if (externalHdfs) {
+      configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs");
+      logger.info("Using external HDFS client as core-site.xml is not located on the classpath.");
+    } else {
+      configuration = new Configuration();
+      logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+    }
     if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
       configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
     }
-    if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) {
-      logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
-      System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
+    if (hdfsOutputConfig.isHdfsKerberos()) {
+      logger.info("Kerberos is enabled for HDFS.");
+      configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+      final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName);
+      UserGroupInformation.setConfiguration(configuration);
+      try {
+        UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab());
+        UserGroupInformation.setLoginUser(ugi);
+      } catch (Exception e) {
+        logger.error("Error during kerberos login", e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) {
+        logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser());
+        System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser());
+      }
     }
+    logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
     this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
   }
 
   @Override
   public void upload(String source, String target) throws Exception {
-    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
+    LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     LogFeederHDFSUtil.closeFileSystem(fs);
   }
 
-  private boolean isHadoopFileSystem(Configuration conf) {
-    return conf.get(FS_DEFAULT_FS).contains("hdfs://");
-  }
-
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index bea29438ab..27d69c7855 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -43,11 +43,11 @@ public static UploadClient createUploadClient(LogFeederProps logFeederProps) {
     if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
       logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
       logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
-      return new HDFSUploadClient();
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false);
     }
     else if (CloudStorageDestination.HDFS.equals(destType)) {
       logger.info("External HDFS output will be used.");
-      return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+      return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true);
     } else if (CloudStorageDestination.S3.equals(destType)) {
       if (useHdfsClient) {
         logger.info("S3 cloud output will be used with HDFS client.");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services