You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/19 16:14:12 UTC
[ambari-logsearch] 28/28: AMBARI-24833. HDFS client kerberos
support + small fixes (#27)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
commit d7499927f4dc2349a45261a23b0b955b173250d8
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 19 14:19:57 2018 +0100
AMBARI-24833. HDFS client kerberos support + small fixes (#27)
* AMBARI-24833. HDFS client kerberos support + small fixes
* AMBARI-24833. Fix principal description
---
.../logfeeder/common/LogFeederConstants.java | 2 +
.../ambari/logfeeder/conf/LogFeederProps.java | 27 ++------
...HdfsOutputConfig.java => HdfsOutputConfig.java} | 65 +++++++++++++++++--
.../impl/AbstractInputConfigHandler.java | 2 +-
.../impl/CloudStorageInputConfigHandler.java | 2 +-
.../operations/impl/DefaultInputConfigHandler.java | 2 +-
.../logfeeder/output/OutputLineEnricher.java | 2 +-
.../cloud/upload/ExternalHDFSUploadClient.java | 73 ----------------------
.../output/cloud/upload/HDFSUploadClient.java | 55 ++++++++++++----
.../output/cloud/upload/UploadClientFactory.java | 4 +-
10 files changed, 114 insertions(+), 120 deletions(-)
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f9ef32d..a15ac74 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -132,6 +132,8 @@ public class LogFeederConstants {
public static final String HDFS_PORT = "logfeeder.hdfs.port";
public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+ public static final String HDFS_KERBEROS_KEYTAB = "logfeeder.hdfs.keytab";
+ public static final String HDFS_KERBEROS_PRINCIPAL = "logfeeder.hdfs.principal";
public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index f2eb6c7..b6ab4c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -53,7 +53,7 @@ public class LogFeederProps implements LogFeederProperties {
private S3OutputConfig s3OutputConfig;
@Inject
- private ExternalHdfsOutputConfig hdfsOutputConfig;
+ private HdfsOutputConfig hdfsOutputConfig;
private Properties properties;
@@ -258,7 +258,7 @@ public class LogFeederProps implements LogFeederProperties {
defaultValue = "false",
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
- @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":true}")
private boolean useCloudHdfsClient;
@LogSearchPropertyDescription(
@@ -281,15 +281,6 @@ public class LogFeederProps implements LogFeederProperties {
private String cloudBasePath;
@LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_USER,
- description = "Overrides HADOOP_USER_NAME variable at runtime",
- examples = {"hdfs"},
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
- private String logfeederHdfsUser;
-
- @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
description = "Use filters for inputs (with filters the output format will be JSON)",
examples = {"true"},
@@ -460,7 +451,7 @@ public class LogFeederProps implements LogFeederProperties {
this.cloudStorageMode = cloudStorageMode;
}
- public ExternalHdfsOutputConfig getHdfsOutputConfig() {
+ public HdfsOutputConfig getHdfsOutputConfig() {
return hdfsOutputConfig;
}
@@ -480,7 +471,7 @@ public class LogFeederProps implements LogFeederProperties {
this.rolloverConfig = rolloverConfig;
}
- public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
+ public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
this.hdfsOutputConfig = hdfsOutputConfig;
}
@@ -512,14 +503,6 @@ public class LogFeederProps implements LogFeederProperties {
return useCloudHdfsClient;
}
- public String getLogfeederHdfsUser() {
- return logfeederHdfsUser;
- }
-
- public void setLogfeederHdfsUser(String logfeederHdfsUser) {
- this.logfeederHdfsUser = logfeederHdfsUser;
- }
-
public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
this.useCloudHdfsClient = useCloudHdfsClient;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
similarity index 59%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
index fbbf869..312f2f0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -24,7 +24,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
-public class ExternalHdfsOutputConfig {
+public class HdfsOutputConfig {
@LogSearchPropertyDescription(
name = LogFeederConstants.HDFS_HOST,
@@ -55,6 +55,15 @@ public class ExternalHdfsOutputConfig {
private String hdfsFilePermissions;
@LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_USER,
+ description = "Overrides HADOOP_USER_NAME variable at runtime",
+ examples = {"hdfs"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+ private String logfeederHdfsUser;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.HDFS_KERBEROS,
description = "Enable kerberos support for HDFS",
examples = {"true"},
@@ -62,7 +71,27 @@ public class ExternalHdfsOutputConfig {
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
- private boolean secure;
+ private boolean hdfsKerberos;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS_KEYTAB,
+ description = "Kerberos keytab location for Log Feeder for communicating with secure HDFS. ",
+ examples = {"/etc/security/keytabs/mykeytab.keytab"},
+ defaultValue = "/etc/security/keytabs/logfeeder.service.keytab",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS_KEYTAB + ":/etc/security/keytabs/logfeeder.service.keytab}")
+ private String keytab;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS_PRINCIPAL,
+ description = "Kerberos principal for Log Feeder for communicating with secure HDFS. ",
+ examples = {"mylogfeeder/myhost1@EXAMPLE.COM"},
+ defaultValue = "logfeeder/_HOST",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS_PRINCIPAL + ":logfeeder/_HOST}")
+ private String principal;
public String getHdfsHost() {
return hdfsHost;
@@ -88,11 +117,35 @@ public class ExternalHdfsOutputConfig {
this.hdfsFilePermissions = hdfsFilePermissions;
}
- public boolean isSecure() {
- return secure;
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public void setKeytab(String keytab) {
+ this.keytab = keytab;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public void setPrincipal(String principal) {
+ this.principal = principal;
+ }
+
+ public String getLogfeederHdfsUser() {
+ return logfeederHdfsUser;
+ }
+
+ public void setLogfeederHdfsUser(String logfeederHdfsUser) {
+ this.logfeederHdfsUser = logfeederHdfsUser;
+ }
+
+ public boolean isHdfsKerberos() {
+ return hdfsKerberos;
}
- public void setSecure(boolean secure) {
- this.secure = secure;
+ public void setHdfsKerberos(boolean hdfsKerberos) {
+ this.hdfsKerberos = hdfsKerberos;
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 31bfd0d..d383ed1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -47,7 +47,7 @@ public abstract class AbstractInputConfigHandler implements InputConfigHandler {
for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
if (filterDescriptor == null) {
- logger.warn("Filter descriptor is smpty. Skipping...");
+ logger.warn("Filter descriptor is empty. Skipping...");
continue;
}
if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index ac10b2d..c2e73b7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -52,7 +52,7 @@ public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler {
final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
if (inputDescriptor == null) {
- logger.warn("Input descriptor is smpty. Skipping...");
+ logger.warn("Input descriptor is empty. Skipping...");
continue;
}
LogFeederMode mode = inputConfigHolder.getLogFeederProps().getCloudStorageMode();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index dd0fe3e..4677461 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -74,7 +74,7 @@ public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
if (inputDescriptor == null) {
- logger.warn("Input descriptor is smpty. Skipping...");
+ logger.warn("Input descriptor is empty. Skipping...");
continue;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
index bd9e3df..ff0805d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -46,7 +46,7 @@ public class OutputLineEnricher {
Input input = inputMarker.getInput();
// Update the block with the context fields
for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
- if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+ if (jsonObj.get(entry.getKey()) == null || "cluster".equals(entry.getKey()) && "null".equals(jsonObj.get(entry.getKey()))) {
jsonObj.put(entry.getKey(), entry.getValue());
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
deleted file mode 100644
index a23a715..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud.upload;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * HDFS (on-prem) specific uploader client that can work with an external HDFS.
- */
-public class ExternalHDFSUploadClient implements UploadClient {
-
- private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
-
- private final ExternalHdfsOutputConfig hdfsOutputConfig;
- private final FsPermission fsPermission;
- private FileSystem fs;
-
- public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
- this.hdfsOutputConfig = hdfsOutputConfig;
- this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
- }
-
- @Override
- public void init(LogFeederProps logFeederProps) {
- logger.info("Initialize external HDFS client ...");
- if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser())) {
- logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
- System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
- }
- this.fs = LogFeederHDFSUtil.buildFileSystem(
- hdfsOutputConfig.getHdfsHost(),
- String.valueOf(hdfsOutputConfig.getHdfsPort()));
- if (logFeederProps.getHdfsOutputConfig().isSecure()) {
- logger.info("Kerberos is enabled for external HDFS.");
- Configuration conf = fs.getConf();
- conf.set("hadoop.security.authentication", "kerberos");
- }
- }
-
- @Override
- public void upload(String source, String target) throws Exception {
- LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
- }
-
- @Override
- public void close() {
- LogFeederHDFSUtil.closeFileSystem(fs);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index c2a8497..421c4c5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,15 +19,17 @@
package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.IOException;
-
/**
* HDFS client that uses core-site.xml file from the classpath to load the configuration.
* Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -35,37 +37,64 @@ import java.io.IOException;
public class HDFSUploadClient implements UploadClient {
private static final String FS_DEFAULT_FS = "fs.defaultFS";
+ private static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
+ private final boolean externalHdfs;
+ private final HdfsOutputConfig hdfsOutputConfig;
+ private final FsPermission fsPermission;
private FileSystem fs;
+ public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
+ this.hdfsOutputConfig = hdfsOutputConfig;
+ this.externalHdfs = externalHdfs;
+ this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+ }
+
@Override
public void init(LogFeederProps logFeederProps) {
- logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
- Configuration configuration = new Configuration();
+ final Configuration configuration;
+ if (externalHdfs) {
+ configuration = LogFeederHDFSUtil.buildHdfsConfiguration(hdfsOutputConfig.getHdfsHost(), String.valueOf(hdfsOutputConfig.getHdfsPort()), "hdfs");
+ logger.info("Using external HDFS client as core-site.xml is not located on the classpath.");
+ } else {
+ configuration = new Configuration();
+ logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+ }
if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
configuration.set(FS_DEFAULT_FS, logFeederProps.getCustomFs());
}
- if (StringUtils.isNotBlank(logFeederProps.getLogfeederHdfsUser()) && isHadoopFileSystem(configuration)) {
- logger.info("Using HADOOP_USER_NAME: {}", logFeederProps.getLogfeederHdfsUser());
- System.setProperty("HADOOP_USER_NAME", logFeederProps.getLogfeederHdfsUser());
+ if (hdfsOutputConfig.isHdfsKerberos()) {
+ logger.info("Kerberos is enabled for HDFS.");
+ configuration.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ final String principal = hdfsOutputConfig.getPrincipal().replace("_HOST", LogFeederUtil.hostName);
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, hdfsOutputConfig.getKeytab());
+ UserGroupInformation.setLoginUser(ugi);
+ } catch (Exception e) {
+ logger.error("Error during kerberos login", e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (StringUtils.isNotBlank(hdfsOutputConfig.getLogfeederHdfsUser())) {
+ logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getLogfeederHdfsUser());
+ System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getLogfeederHdfsUser());
+ }
}
+ logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
}
@Override
public void upload(String source, String target) throws Exception {
- LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
+ LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
}
@Override
- public void close() throws IOException {
+ public void close() {
LogFeederHDFSUtil.closeFileSystem(fs);
}
- private boolean isHadoopFileSystem(Configuration conf) {
- return conf.get(FS_DEFAULT_FS).contains("hdfs://");
- }
-
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index bea2943..27d69c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -43,11 +43,11 @@ public class UploadClientFactory {
if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
logFeederProps.setCloudStorageDestination(CloudStorageDestination.DEFAULT_FS);
- return new HDFSUploadClient();
+ return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), false);
}
else if (CloudStorageDestination.HDFS.equals(destType)) {
logger.info("External HDFS output will be used.");
- return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+ return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig(), true);
} else if (CloudStorageDestination.S3.equals(destType)) {
if (useHdfsClient) {
logger.info("S3 cloud output will be used with HDFS client.");