You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/13 10:34:56 UTC
[ambari-logsearch] branch master updated: AMBARI-24833. Simplify
HDFS client usage + use core-site.xml (#21)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
The following commit(s) were added to refs/heads/master by this push:
new db5fb3d AMBARI-24833. Simplify HDFS client usage + use core-site.xml (#21)
db5fb3d is described below
commit db5fb3db496beec489a55704eaf9b58d9d683bac
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Tue Nov 13 11:34:51 2018 +0100
AMBARI-24833. Simplify HDFS client usage + use core-site.xml (#21)
---
.../logfeeder/common/LogFeederConstants.java | 6 ++-
.../ambari/logfeeder/conf/LogFeederProps.java | 51 ++++++++++++++++++----
.../ambari/logfeeder/conf/output/BucketConfig.java | 18 ++++++++
.../conf/output/CloudStorageOutputConfig.java | 47 --------------------
...utConfig.java => ExternalHdfsOutputConfig.java} | 25 +----------
.../ambari/logfeeder/conf/output/OutputConfig.java | 32 --------------
.../logfeeder/conf/output/RolloverConfig.java | 18 ++++++++
.../logfeeder/conf/output/S3OutputConfig.java | 30 +------------
.../output/cloud/CloudStorageLoggerFactory.java | 9 ++--
.../output/cloud/CloudStorageUploader.java | 6 +--
...CloudClient.java => AbstractS3CloudClient.java} | 10 ++---
...adClient.java => ExternalHDFSUploadClient.java} | 28 +++++-------
.../output/cloud/upload/HDFSS3UploadClient.java | 16 +++----
.../output/cloud/upload/HDFSUploadClient.java | 44 ++++++-------------
.../output/cloud/upload/S3UploadClient.java | 12 ++---
.../output/cloud/upload/UploadClient.java | 13 +-----
.../output/cloud/upload/UploadClientFactory.java | 19 ++++++--
.../ambari/logfeeder/util/LogFeederHDFSUtil.java | 22 +++++++---
.../src/main/resources/core-site.xml | 42 ++++++++++++++++++
.../src/main/resources/logfeeder.properties | 11 +++--
20 files changed, 218 insertions(+), 241 deletions(-)
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index f528c45..11d351f 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -113,9 +113,13 @@ public class LogFeederConstants {
public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
+ public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client";
+ public static final String CLOUD_STORAGE_CUSTOM_FS = "logfeeder.cloud.storage.custom.fs";
+ public static final String CLOUD_STORAGE_BASE_PATH = "logfeeder.cloud.storage.base.path";
+ public static final String CLOUD_ROLLOVER_ARCHIVE_LOCATION = "logfeeder.cloud.rollover.archive.base.dir";
public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_MIN = "logfeeder.cloud.rollover.threshold.min";
public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE = "logfeeder.cloud.rollover.threshold.size";
public static final String CLOUD_ROLLOVER_USE_GZIP = "logfeeder.cloud.rollover.use.gzip";
@@ -126,14 +130,12 @@ public class LogFeederConstants {
public static final String HDFS_HOST = "logfeeder.hdfs.host";
public static final String HDFS_PORT = "logfeeder.hdfs.port";
public static final String HDFS_USER = "logfeeder.hdfs.user";
- public static final String HDFS_OUTPUT_BASE_DIR = "logfeeder.hdfs.output.base.dir";
public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
public static final String S3_REGION = "logfeeder.s3.region";
- public static final String S3_BUCKET = "logfeeder.s3.bucket";
public static final String S3_OBJECT_ACL = "logfeeder.s3.object.acl";
public static final String S3_PATH_STYLE_ACCESS = "logfeeder.s3.path.style.access";
public static final String S3_MULTIOBJECT_DELETE_ENABLE = "logfeeder.s3.multiobject.delete.enable";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index e89f7f4..d32e1df 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,7 +19,7 @@
package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
@@ -47,13 +47,13 @@ public class LogFeederProps implements LogFeederProperties {
private Environment env;
@Inject
- private HdfsOutputConfig hdfsOutputConfig;
+ private RolloverConfig rolloverConfig;
@Inject
private S3OutputConfig s3OutputConfig;
@Inject
- private RolloverConfig rolloverConfig;
+ private ExternalHdfsOutputConfig hdfsOutputConfig;
private Properties properties;
@@ -238,7 +238,7 @@ public class LogFeederProps implements LogFeederProperties {
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN + ":false}")
- public boolean cloudStorageUploadOnShutdown;
+ private boolean cloudStorageUploadOnShutdown;
@LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS,
@@ -248,7 +248,7 @@ public class LogFeederProps implements LogFeederProperties {
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS + ":60}")
- public Integer cloudStorageUploaderIntervalSeconds;
+ private Integer cloudStorageUploaderIntervalSeconds;
@LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
@@ -258,7 +258,26 @@ public class LogFeederProps implements LogFeederProperties {
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
- public boolean useCloudHdfsClient;
+ private boolean useCloudHdfsClient;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_CUSTOM_FS,
+ description = "If it is not empty, override fs.defaultFS for HDFS client. Can be useful to write data to a different bucket (from other services) if the bucket address is read from core-site.xml",
+ examples = {"s3a://anotherbucket"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_CUSTOM_FS + ":}")
+ private String customFs;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_BASE_PATH,
+ description = "Base path prefix for storing logs (cloud storage / hdfs)",
+ examples = {"/user/logsearch/mypath"},
+ defaultValue = "/apps/logsearch",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_BASE_PATH + ":}")
+ private String cloudBasePath;
@Inject
private LogEntryCacheConfig logEntryCacheConfig;
@@ -421,7 +440,7 @@ public class LogFeederProps implements LogFeederProperties {
this.cloudStorageMode = cloudStorageMode;
}
- public HdfsOutputConfig getHdfsOutputConfig() {
+ public ExternalHdfsOutputConfig getHdfsOutputConfig() {
return hdfsOutputConfig;
}
@@ -441,7 +460,7 @@ public class LogFeederProps implements LogFeederProperties {
this.rolloverConfig = rolloverConfig;
}
- public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
+ public void setHdfsOutputConfig(ExternalHdfsOutputConfig hdfsOutputConfig) {
this.hdfsOutputConfig = hdfsOutputConfig;
}
@@ -477,6 +496,22 @@ public class LogFeederProps implements LogFeederProperties {
this.useCloudHdfsClient = useCloudHdfsClient;
}
+ public String getCustomFs() {
+ return customFs;
+ }
+
+ public void setCustomFs(String customFs) {
+ this.customFs = customFs;
+ }
+
+ public String getCloudBasePath() {
+ return cloudBasePath;
+ }
+
+ public void setCloudBasePath(String cloudBasePath) {
+ this.cloudBasePath = cloudBasePath;
+ }
+
public String[] getSolrUrls() {
if (StringUtils.isNotBlank(this.solrUrlsStr)) {
return this.solrUrlsStr.split(",");
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
index 7432cdd..ee49aef 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
@@ -27,6 +27,16 @@ import org.springframework.context.annotation.Configuration;
public class BucketConfig {
@LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_BUCKET,
+ description = "Amazon S3 bucket.",
+ examples = {"logs"},
+ defaultValue = "logfeeder",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_STORAGE_BUCKET + ":logfeeder}")
+ private String bucket;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP,
description = "Create bucket on startup.",
examples = {"false"},
@@ -36,6 +46,14 @@ public class BucketConfig {
@Value("${"+ LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP + ":false}")
private boolean createBucketOnStartup;
+ public String getBucket() {
+ return this.bucket;
+ }
+
+ public void setBucket(String bucketName) {
+ this.bucket = bucketName;
+ }
+
public boolean isCreateBucketOnStartup() {
return createBucketOnStartup;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
deleted file mode 100644
index 6603a5d..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.conf.output;
-
-public interface CloudStorageOutputConfig extends OutputConfig {
- String getSecretKey();
-
- String getAccessKey();
-
- String getSecretKeyFileLocation();
-
- String getAccessKeyFileLocation();
-
- boolean isUseFileSecrets();
-
- boolean isUseHadoopCredentialStorage();
-
- String getSecretKeyHadoopCredentialReference();
-
- String getAccessKeyHadoopCredentialReference();
-
- String getAccessKeyProperty();
-
- String getSecretKeyProperty();
-
- String getAccessKeyEnvVariable();
-
- String getSecretKeyEnvVariable();
-
- String getDescription();
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
similarity index 83%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
index e81ce7a..70772f7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/ExternalHdfsOutputConfig.java
@@ -24,7 +24,7 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@Configuration
-public class HdfsOutputConfig implements OutputConfig {
+public class ExternalHdfsOutputConfig {
@LogSearchPropertyDescription(
name = LogFeederConstants.HDFS_HOST,
@@ -64,16 +64,6 @@ public class HdfsOutputConfig implements OutputConfig {
private String hdfsUser;
@LogSearchPropertyDescription(
- name = LogFeederConstants.HDFS_OUTPUT_BASE_DIR,
- description = "HDFS base directory for uploading files",
- examples = {"/my/path/on/hdfs"},
- defaultValue = "/user/hdfs",
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.HDFS_OUTPUT_BASE_DIR + ":/user/hdfs}")
- private String hdfsOutputDir;
-
- @LogSearchPropertyDescription(
name = LogFeederConstants.HDFS_KERBEROS,
description = "Enable kerberos support for HDFS",
examples = {"true"},
@@ -115,14 +105,6 @@ public class HdfsOutputConfig implements OutputConfig {
this.hdfsUser = hdfsUser;
}
- public String getHdfsOutputDir() {
- return hdfsOutputDir;
- }
-
- public void setHdfsOutputDir(String hdfsOutputDir) {
- this.hdfsOutputDir = hdfsOutputDir;
- }
-
public boolean isSecure() {
return secure;
}
@@ -130,9 +112,4 @@ public class HdfsOutputConfig implements OutputConfig {
public void setSecure(boolean secure) {
this.secure = secure;
}
-
- @Override
- public String getOutputBasePath() {
- return this.hdfsOutputDir;
- }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
deleted file mode 100644
index 0b99e03..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.conf.output;
-
-/**
- * Output config marker interface
- */
-public interface OutputConfig {
-
- /**
- * Holds output destiation for a storage, it can act as a base output directory or a bucket
- * @return output directory or output bucket
- */
- String getOutputBasePath();
-
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
index d447838..7465a50 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
@@ -27,6 +27,16 @@ import org.springframework.context.annotation.Configuration;
public class RolloverConfig {
@LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_ARCHIVE_LOCATION,
+ description = "Location where the active and archives logs will be stored. Beware, it could require a large amount of space, use mounted disks if it is possible.",
+ examples = {"/var/lib/ambari-logsearch-logfeeder/data"},
+ defaultValue = "/tmp",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_ARCHIVE_LOCATION + ":/tmp}")
+ private String rolloverArchiveBaseDir;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_MIN,
description = "Rollover cloud log files after an interval (minutes)",
examples = {"1"},
@@ -133,4 +143,12 @@ public class RolloverConfig {
public void setRolloverOnStartup(boolean rolloverOnStartup) {
this.rolloverOnStartup = rolloverOnStartup;
}
+
+ public String getRolloverArchiveBaseDir() {
+ return rolloverArchiveBaseDir;
+ }
+
+ public void setRolloverArchiveBaseDir(String rolloverArchiveBaseDir) {
+ this.rolloverArchiveBaseDir = rolloverArchiveBaseDir;
+ }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
index 2da2698..465c2bd 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
@@ -27,7 +27,7 @@ import org.springframework.context.annotation.Configuration;
import javax.inject.Inject;
@Configuration
-public class S3OutputConfig implements CloudStorageOutputConfig {
+public class S3OutputConfig {
private final BucketConfig bucketConfig;
@@ -125,16 +125,6 @@ public class S3OutputConfig implements CloudStorageOutputConfig {
private String region;
@LogSearchPropertyDescription(
- name = LogFeederConstants.S3_BUCKET,
- description = "Amazon S3 bucket.",
- examples = {"logs"},
- defaultValue = "logfeeder",
- sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
- )
- @Value("${"+ LogFeederConstants.S3_BUCKET + ":logfeeder}")
- private String bucket;
-
- @LogSearchPropertyDescription(
name = LogFeederConstants.S3_OBJECT_ACL,
description = "Amazon S3 ACLs for new objects.",
examples = {"logs"},
@@ -170,19 +160,6 @@ public class S3OutputConfig implements CloudStorageOutputConfig {
this.bucketConfig = bucketConfig;
}
- @Override
- public String getOutputBasePath() {
- return this.bucket;
- }
-
- public String getBucket() {
- return this.bucket;
- }
-
- public void setBucket(String bucketName) {
- this.bucket = bucketName;
- }
-
public String getEndpoint() {
return endpoint;
}
@@ -259,27 +236,22 @@ public class S3OutputConfig implements CloudStorageOutputConfig {
return accessKeyHadoopCredentialReference;
}
- @Override
public String getAccessKeyProperty() {
return LogFeederConstants.S3_ACCESS_KEY;
}
- @Override
public String getSecretKeyProperty() {
return LogFeederConstants.S3_SECRET_KEY;
}
- @Override
public String getAccessKeyEnvVariable() {
return "AWS_ACCESS_KEY_ID";
}
- @Override
public String getSecretKeyEnvVariable() {
return "AWS_SECRET_ACCESS_KEY";
}
- @Override
public String getDescription() {
return "AWS S3";
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
index 6cf0c7c..f42e556 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -45,16 +45,17 @@ public class CloudStorageLoggerFactory {
private static final String ACTIVE_FOLDER = "active";
private static final String ARCHIVED_FOLDER = "archived";
- private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log.gz";
- private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log";
+ private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz";
+ private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log";
public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) {
String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, "");
String uniqueThreadName = input.getThread().getName();
Configuration config = loggerContext.getConfiguration();
String destination = logFeederProps.getCloudStorageDestination().getText();
- String activeLogDir = Paths.get(logFeederProps.getTmpDir(), ACTIVE_FOLDER, type).toFile().getAbsolutePath();
- String archiveLogDir = Paths.get(logFeederProps.getTmpDir(), destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
+ String baseDir = logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir();
+ String activeLogDir = Paths.get(baseDir, destination, ACTIVE_FOLDER, type).toFile().getAbsolutePath();
+ String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index ebb0cef..ea52de5 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -82,11 +82,11 @@ public class CloudStorageUploader extends Thread {
logger.debug("Not found any files to upload.");
} else {
for (File file : filesToUpload) {
- String basePath = uploadClient.getOutputConfig().getOutputBasePath();
- String outputPath = String.format("%s/%s/%s/%s", clusterName, hostName, file.getParentFile().getName(), file.getName())
+ String basePath = logFeederProps.getCloudBasePath();
+ String outputPath = String.format("%s/%s/%s/%s/%s", basePath, clusterName, hostName, file.getParentFile().getName(), file.getName())
.replaceAll("//", "/");
logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
- uploadClient.upload(file.getAbsolutePath(), outputPath, basePath);
+ uploadClient.upload(file.getAbsolutePath(), outputPath);
}
}
} else {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractS3CloudClient.java
similarity index 94%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractS3CloudClient.java
index d4a45de..db0a9af 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractS3CloudClient.java
@@ -20,7 +20,7 @@ package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.conf.output.BucketConfig;
-import org.apache.ambari.logfeeder.conf.output.CloudStorageOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
import org.apache.ambari.logfeeder.credential.CompositeSecretStore;
import org.apache.ambari.logfeeder.credential.EnvSecretStore;
import org.apache.ambari.logfeeder.credential.FileSecretStore;
@@ -38,9 +38,9 @@ import java.util.List;
/**
* Holds common cloud based client operations
*/
-abstract class AbstractCloudClient {
+abstract class AbstractS3CloudClient {
- private static final Logger logger = LogManager.getLogger(AbstractCloudClient.class);
+ private static final Logger logger = LogManager.getLogger(AbstractS3CloudClient.class);
/**
* Create a cloud specific bucket if it does not exists
@@ -85,7 +85,7 @@ abstract class AbstractCloudClient {
* @param config cloud based configuration
* @return secret key pair
*/
- SecretKeyPair getSecretKeyPair(LogFeederProps props, CloudStorageOutputConfig config) {
+ SecretKeyPair getSecretKeyPair(LogFeederProps props, S3OutputConfig config) {
String secretFile = config.isUseFileSecrets() ? config.getSecretKeyFileLocation() : null;
String secretRef = config.isUseHadoopCredentialStorage() ? config.getSecretKeyHadoopCredentialReference() : null;
CompositeSecretStore secretKeyStore = createCompositeSecretStore(props, config.getSecretKey(), config.getSecretKeyProperty(),
@@ -113,7 +113,7 @@ abstract class AbstractCloudClient {
* @param credentialRef credential provider referece to check for secret
* @return composite secret store that contains multiple way to get a secret
*/
- CompositeSecretStore createCompositeSecretStore(LogFeederProps props, String plainTextSecret, String property, String env,
+ private CompositeSecretStore createCompositeSecretStore(LogFeederProps props, String plainTextSecret, String property, String env,
String file, String credentialRef) {
List<SecretStore> secretStores = new ArrayList<>();
if (StringUtils.isNotBlank(plainTextSecret)) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
similarity index 70%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
index 0c4dce5..cabf004 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/ExternalHDFSUploadClient.java
@@ -19,9 +19,8 @@
package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.ExternalHdfsOutputConfig;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -30,48 +29,45 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
- * HDFS (on-prem) specific uploader client.
+ * HDFS (on-prem) specific uploader client that can work with an external HDFS.
*/
-public class HDFSUploadClient implements UploadClient<HdfsOutputConfig> {
+public class ExternalHDFSUploadClient implements UploadClient {
- private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
+ private static final Logger logger = LogManager.getLogger(ExternalHDFSUploadClient.class);
- private final HdfsOutputConfig hdfsOutputConfig;
+ private final ExternalHdfsOutputConfig hdfsOutputConfig;
private final FsPermission fsPermission;
private FileSystem fs;
- public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig) {
+ public ExternalHDFSUploadClient(ExternalHdfsOutputConfig hdfsOutputConfig) {
this.hdfsOutputConfig = hdfsOutputConfig;
this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
}
@Override
public void init(LogFeederProps logFeederProps) {
+ logger.info("Initialize external HDFS client ...");
if (StringUtils.isNotBlank(hdfsOutputConfig.getHdfsUser())) {
+ logger.info("Using HADOOP_USER_NAME: {}", hdfsOutputConfig.getHdfsUser());
System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getHdfsUser());
}
this.fs = LogFeederHDFSUtil.buildFileSystem(
hdfsOutputConfig.getHdfsHost(),
String.valueOf(hdfsOutputConfig.getHdfsPort()));
if (logFeederProps.getHdfsOutputConfig().isSecure()) {
+ logger.info("Kerberos is enabled for external HDFS.");
Configuration conf = fs.getConf();
conf.set("hadoop.security.authentication", "kerberos");
}
}
@Override
- public void upload(String source, String target, String basePath) throws Exception {
- String outputPath = String.format("%s/%s", basePath, target).replaceAll("//", "/");
- LogFeederHDFSUtil.copyFromLocal(source, outputPath, fs, true, true, fsPermission);
- }
-
- @Override
- public HdfsOutputConfig getOutputConfig() {
- return this.hdfsOutputConfig;
+ public void upload(String source, String target) throws Exception {
+ LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, fsPermission);
}
@Override
public void close() {
- IOUtils.closeQuietly(fs);
+ LogFeederHDFSUtil.closeFileSystem(fs);
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
index dd5a225..5405a10 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
@@ -30,7 +30,7 @@ import org.apache.logging.log4j.Logger;
import java.io.IOException;
-public class HDFSS3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+public class HDFSS3UploadClient extends AbstractS3CloudClient implements UploadClient {
private static final Logger logger = LogManager.getLogger(HDFSS3UploadClient.class);
@@ -44,14 +44,13 @@ public class HDFSS3UploadClient extends AbstractCloudClient implements UploadCli
@Override
void createBucketIfNeeded(String bucket) {
- logger.warn("HDFS based S3 client won't bootstrap default bucket ('{}')", s3OutputConfig.getBucket());
+ logger.warn("HDFS based S3 client won't bootstrap default bucket ('{}')", s3OutputConfig.getBucketConfig().getBucket());
}
@Override
public void init(LogFeederProps logFeederProps) {
SecretKeyPair keyPair = getSecretKeyPair(logFeederProps, s3OutputConfig);
- // TODO: load configuration from file
- Configuration conf = LogFeederHDFSUtil.buildHdfsConfiguration(s3OutputConfig.getBucket(), "s3a");
+ Configuration conf = LogFeederHDFSUtil.buildHdfsConfiguration(s3OutputConfig.getBucketConfig().getBucket(), "s3a");
conf.set("fs.s3a.access.key", new String(keyPair.getAccessKey()));
conf.set("fs.s3a.secret.key", new String(keyPair.getSecretKey()));
conf.set("fs.s3a.aws.credentials.provider", SimpleAWSCredentialsProvider.NAME);
@@ -62,17 +61,12 @@ public class HDFSS3UploadClient extends AbstractCloudClient implements UploadCli
}
@Override
- public void upload(String source, String target, String basePath) throws Exception {
+ public void upload(String source, String target) throws Exception {
LogFeederHDFSUtil.copyFromLocal(source, target, this.fs, true, true, null);
}
@Override
- public S3OutputConfig getOutputConfig() {
- return this.s3OutputConfig;
- }
-
- @Override
public void close() throws IOException {
- IOUtils.closeQuietly(fs);
+ LogFeederHDFSUtil.closeFileSystem(fs);
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index 0c4dce5..9e0a136 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -19,59 +19,43 @@
package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.io.IOException;
+
/**
- * HDFS (on-prem) specific uploader client.
+ * HDFS client that uses core-site.xml file from the classpath to load the configuration.
+ * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
*/
-public class HDFSUploadClient implements UploadClient<HdfsOutputConfig> {
+public class HDFSUploadClient implements UploadClient {
private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
- private final HdfsOutputConfig hdfsOutputConfig;
- private final FsPermission fsPermission;
private FileSystem fs;
- public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig) {
- this.hdfsOutputConfig = hdfsOutputConfig;
- this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
- }
-
@Override
public void init(LogFeederProps logFeederProps) {
- if (StringUtils.isNotBlank(hdfsOutputConfig.getHdfsUser())) {
- System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getHdfsUser());
- }
- this.fs = LogFeederHDFSUtil.buildFileSystem(
- hdfsOutputConfig.getHdfsHost(),
- String.valueOf(hdfsOutputConfig.getHdfsPort()));
- if (logFeederProps.getHdfsOutputConfig().isSecure()) {
- Configuration conf = fs.getConf();
- conf.set("hadoop.security.authentication", "kerberos");
+ logger.info("Initialize HDFS client (cloud mode), using core-site.xml from the classpath.");
+ Configuration configuration = new Configuration();
+ if (StringUtils.isNotBlank(logFeederProps.getCustomFs())) {
+ configuration.set("fs.defaultFS", logFeederProps.getCustomFs());
}
+ this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
}
@Override
- public void upload(String source, String target, String basePath) throws Exception {
- String outputPath = String.format("%s/%s", basePath, target).replaceAll("//", "/");
- LogFeederHDFSUtil.copyFromLocal(source, outputPath, fs, true, true, fsPermission);
+ public void upload(String source, String target) throws Exception {
+ LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, null);
}
@Override
- public HdfsOutputConfig getOutputConfig() {
- return this.hdfsOutputConfig;
+ public void close() throws IOException {
+ LogFeederHDFSUtil.closeFileSystem(fs);
}
- @Override
- public void close() {
- IOUtils.closeQuietly(fs);
- }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
index 819a001..4d202a0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
@@ -37,7 +37,7 @@ import java.io.File;
/**
* S3 specific upload client
*/
-public class S3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+public class S3UploadClient extends AbstractS3CloudClient implements UploadClient {
private static final Logger logger = LogManager.getLogger(S3UploadClient.class);
@@ -61,11 +61,12 @@ public class S3UploadClient extends AbstractCloudClient implements UploadClient<
.withEndpointConfiguration(endpointConf)
.withPathStyleAccessEnabled(s3OutputConfig.isPathStyleAccess())
.build();
- bootstrapBucket(s3OutputConfig.getOutputBasePath(), s3OutputConfig.getBucketConfig());
+ bootstrapBucket(s3OutputConfig.getBucketConfig().getBucket(), s3OutputConfig.getBucketConfig());
}
@Override
- public void upload(String source, String target, String bucket) throws Exception {
+ public void upload(String source, String target) throws Exception {
+ String bucket = this.s3OutputConfig.getBucketConfig().getBucket();
File fileToUpload = new File(source);
logger.info("Starting S3 upload {} -> bucket: {}, key: {}", source, bucket, target);
s3Client.putObject(bucket, target, new File(source));
@@ -74,11 +75,6 @@ public class S3UploadClient extends AbstractCloudClient implements UploadClient<
}
@Override
- public S3OutputConfig getOutputConfig() {
- return this.s3OutputConfig;
- }
-
- @Override
public void close() {
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
index d8adf68..949ae1e 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
@@ -19,15 +19,13 @@
package org.apache.ambari.logfeeder.output.cloud.upload;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.output.OutputConfig;
import java.io.Closeable;
/**
* Client that is responsible to upload files to cloud storage implementation.
- * @param <CONF_TYPE> specific cloud configuration type
*/
-public interface UploadClient<CONF_TYPE extends OutputConfig> extends Closeable {
+public interface UploadClient extends Closeable {
/**
* Initialize the client
@@ -39,14 +37,7 @@ public interface UploadClient<CONF_TYPE extends OutputConfig> extends Closeable
* Upload source file to cloud storage location
* @param source file that will be uploaded
* @param target file key/output on cloud storage
- * @param basePath act as a base directory or can be a bucket as well
* @throws Exception error during upload
*/
- void upload(String source, String target, String basePath) throws Exception;
-
- /**
- * Obtain cloud specific output configuration
- * @return output configuration holder
- */
- CONF_TYPE getOutputConfig();
+ void upload(String source, String target) throws Exception;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
index b86ec6d..2865b28 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -22,6 +22,7 @@ import org.apache.ambari.logfeeder.conf.CloudStorageDestination;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.springframework.core.io.ClassPathResource;
/**
* Factory class to create cloud specific data uploader client based on global Log Feeder settings.
@@ -38,16 +39,28 @@ public class UploadClientFactory {
public static UploadClient createUploadClient(LogFeederProps logFeederProps) {
CloudStorageDestination destType = logFeederProps.getCloudStorageDestination();
logger.info("Creating upload client for storage: {}", destType);
- if (CloudStorageDestination.HDFS.equals(destType)) {
- return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+ boolean useHdfsClient = logFeederProps.isUseCloudHdfsClient();
+ if (useHdfsClient && checkCoreSiteIsOnClasspath(logFeederProps)) {
+ logger.info("The core-site.xml from the classpath will be used to figure it out the cloud output settings.");
+ return new HDFSUploadClient();
+ }
+ else if (CloudStorageDestination.HDFS.equals(destType)) {
+ logger.info("External HDFS output will be used.");
+ return new ExternalHDFSUploadClient(logFeederProps.getHdfsOutputConfig());
} else if (CloudStorageDestination.S3.equals(destType)) {
- if (logFeederProps.isUseCloudHdfsClient()) {
+ if (useHdfsClient) {
+ logger.info("S3 cloud output will be used with HDFS client.");
return new HDFSS3UploadClient(logFeederProps.getS3OutputConfig());
} else {
+ logger.info("S3 cloud output will be used with AWS sdk client (core).");
return new S3UploadClient(logFeederProps.getS3OutputConfig());
}
} else {
throw new IllegalArgumentException(String.format("No cloud storage type is selected as destination: %s", destType));
}
}
+
+ private static boolean checkCoreSiteIsOnClasspath(LogFeederProps logFeederProps) {
+ return new ClassPathResource("core-site.xml").exists();
+ }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
index 61be819..3549e04 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
@@ -55,12 +55,24 @@ public class LogFeederHDFSUtil {
}
public static FileSystem buildFileSystem(Configuration configuration) {
- try {
- return FileSystem.get(configuration);
- } catch (Exception e) {
- logger.error("Exception during buildFileSystem call:", e);
+ return buildFileSystem(configuration, 5);
+ }
+
+ public static FileSystem buildFileSystem(Configuration configuration, int sleepSeconds) {
+ while (true) {
+ try {
+ return FileSystem.get(configuration);
+ } catch (Exception e) {
+ logger.error("Exception during buildFileSystem call:", e);
+ }
+ try {
+ Thread.sleep(1000 * sleepSeconds);
+ } catch (InterruptedException e) {
+ logger.error("Error during thread sleep (filesystem bootstrap)", e);
+ Thread.currentThread().interrupt();
+ return null;
+ }
}
- return null;
}
public static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort, String scheme) {
diff --git a/ambari-logsearch-logfeeder/src/main/resources/core-site.xml b/ambari-logsearch-logfeeder/src/main/resources/core-site.xml
new file mode 100644
index 0000000..9ec1d04
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/resources/core-site.xml
@@ -0,0 +1,42 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+ <property>
+ <name>fs.defaultFS</name>
+ <value>s3a://logfeeder</value>
+ </property>
+ <property>
+ <name>fs.s3a.endpoint</name>
+ <value>http://localhost:4569</value>
+ </property>
+ <property>
+ <name>fs.s3a.access.key</name>
+ <value>MyAccessKey</value>
+ </property>
+ <property>
+ <name>fs.s3a.secret.key</name>
+ <value>MySecretKey</value>
+ </property>
+ <property>
+ <name>fs.s3a.path.style.access</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>fs.s3a.multiobjectdelete.enable</name>
+ <value>false</value>
+ </property>
+</configuration>
\ No newline at end of file
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 127e1b3..daa9821 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -40,12 +40,18 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp
#logfeeder.configs.filter.solr.enabled=true
#logfeeder.docker.registry.enabled=true
-#logfeeder.cloud.storage.mode=cloud
logfeeder.cloud.storage.mode=default
+#logfeeder.cloud.storage.mode=cloud
logfeeder.cloud.storage.destination=s3
logfeeder.cloud.storage.uploader.interval.seconds=1
logfeeder.cloud.storage.upload.on.shutdown=true
+logfeeder.cloud.storage.base.path=/apps/logfeeder
logfeeder.cloud.storage.use.hdfs.client=true
+
+logfeeder.cloud.storage.bucket=logfeeder
+logfeeder.cloud.storage.bucket.bootstrap=true
+
+logfeeder.cloud.rollover.archive.base.dir=target/tmp
logfeeder.cloud.rollover.threshold.min=1000
logfeeder.cloud.rollover.threshold.size=1K
logfeeder.cloud.rollover.immediate.flush=true
@@ -53,12 +59,11 @@ logfeeder.cloud.rollover.immediate.flush=true
logfeeder.hdfs.host=c7401.ambari.apache.org
logfeeder.hdfs.port=8020
logfeeder.hdfs.user=hdfs
-logfeeder.hdfs.output.base.dir=/user/hdfs/logfeeder
+logfeeder.hdfs.output.base.dir=/apps/logfeeder
logfeeder.s3.endpoint=http://localhost:4569
logfeeder.s3.secret.key=MySecretKey
logfeeder.s3.access.key=MyAccessKey
-logfeeder.s3.bucket=logfeeder
logfeeder.s3.object.acl=public-read
logfeeder.s3.path.style.access=true
logfeeder.s3.multiobject.delete.enable=false