You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/12 13:52:44 UTC
[ambari-logsearch] branch master updated: AMBARI-24833.
Re-implement S3/HDFS outputs as global cloud outputs (#19)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
The following commit(s) were added to refs/heads/master by this push:
new 2bd96e1 AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs (#19)
2bd96e1 is described below
commit 2bd96e130795d1a1b7961ad5cc695487df05af07
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 12 14:52:40 2018 +0100
AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs (#19)
* AMBARI-24833. Re-implement S3/HDFS outputs as global cloud outputs
* AMBARI-24833. Use aws/hadoop versions as properties
* Fix typo
* AMBARI-24833. Change logfeeder docker config back to default
* AMBARI-24833. Review changes.
---
.../ambari/logfeeder/plugin/input/Input.java | 3 +
.../logfeeder/plugin/manager/OutputManager.java | 7 +
.../ambari/logfeeder/plugin/output/Output.java | 2 +-
ambari-logsearch-logfeeder/pom.xml | 40 ++-
.../logfeeder/common/LogFeederConstants.java | 35 ++
.../logfeeder/conf/CloudStorageDestination.java} | 31 +-
.../ambari/logfeeder/conf/LogFeederProps.java | 108 +++++-
.../ambari/logfeeder/conf/output/BucketConfig.java | 46 +++
.../output/CloudStorageOutputConfig.java} | 35 +-
.../logfeeder/conf/output/HdfsOutputConfig.java | 138 ++++++++
.../output/OutputConfig.java} | 18 +-
.../logfeeder/conf/output/RolloverConfig.java | 136 ++++++++
.../logfeeder/conf/output/S3OutputConfig.java | 327 ++++++++++++++++++
.../PlainTextSecretStore.java} | 20 +-
.../apache/ambari/logfeeder/input/InputS3File.java | 118 -------
.../ambari/logfeeder/output/OutputHDFSFile.java | 269 ---------------
.../ambari/logfeeder/output/OutputS3File.java | 279 ---------------
.../ambari/logfeeder/output/S3LogPathResolver.java | 54 ---
.../logfeeder/output/S3OutputConfiguration.java | 121 -------
.../apache/ambari/logfeeder/output/S3Uploader.java | 166 ---------
.../output/cloud/CloudStorageLoggerFactory.java | 107 ++++++
.../logfeeder/output/cloud/CloudStorageOutput.java | 133 +++++++-
.../output/cloud/CloudStorageOutputManager.java | 21 +-
.../output/cloud/CloudStorageUploader.java | 100 ++++++
.../cloud/CustomTimeBasedTriggeringPolicy.java | 85 +++++
.../ambari/logfeeder/output/cloud/HDFSOutput.java | 74 ----
.../output/cloud/upload/AbstractCloudClient.java | 135 ++++++++
.../output/cloud/upload/HDFSS3UploadClient.java | 78 +++++
.../output/cloud/upload/HDFSUploadClient.java | 77 +++++
.../output/cloud/upload/S3UploadClient.java | 91 +++++
.../SecretKeyPair.java} | 23 +-
.../output/cloud/upload/UploadClient.java | 52 +++
.../output/cloud/upload/UploadClientFactory.java | 53 +++
.../ambari/logfeeder/output/spool/LogSpooler.java | 210 ------------
.../logfeeder/output/spool/LogSpoolerContext.java | 85 -----
.../output/spool/LogSpoolerException.java | 29 --
.../logfeeder/output/spool/RolloverCondition.java | 36 --
.../logfeeder/output/spool/RolloverHandler.java | 40 ---
.../ambari/logfeeder/util/LogFeederHDFSUtil.java | 42 ++-
.../org/apache/ambari/logfeeder/util/S3Util.java | 140 --------
.../src/main/resources/alias_config.json | 9 -
.../src/main/resources/logfeeder.properties | 23 +-
.../ambari/logfeeder/output/OutputS3FileTest.java | 100 ------
.../logfeeder/output/S3LogPathResolverTest.java | 52 ---
.../ambari/logfeeder/output/S3UploaderTest.java | 164 ---------
.../logfeeder/output/spool/LogSpoolerTest.java | 374 ---------------------
ambari-logsearch-server/pom.xml | 2 +-
docker/cloud-docker-compose.yml | 3 +
docker/test-config/logfeeder/logfeeder.properties | 13 +-
pom.xml | 9 +-
50 files changed, 1892 insertions(+), 2421 deletions(-)
diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 9ee4533..be1e793 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -232,6 +232,9 @@ public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
*/
public void close() {
logger.info("Close called. " + getShortDescription());
+ if (getOutputManager() != null) {
+ getOutputManager().release(this);
+ }
try {
if (firstFilter != null) {
firstFilter.close();
diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
index 7ab8825..12f00b9 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.logfeeder.plugin.manager;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
@@ -63,4 +64,10 @@ public abstract class OutputManager implements BlockManager {
*/
public abstract List<Output> getOutputs();
+ /**
+ * Release an input (can be used for cleanup) - by default it won't do anything, override this if needed
+ * @param input holds input object - in order to gather unique details
+ */
+ public void release(Input input) {
+ }
}
diff --git a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
index 73caf68..cc2e5e5 100644
--- a/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
+++ b/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
@@ -166,7 +166,7 @@ public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER
}
public void close() {
- LOG.info("Calling base close()." + getShortDescription());
+ LOG.info("Calling base close() = " + getShortDescription());
isClosed = true;
}
}
diff --git a/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch-logfeeder/pom.xml
index 71cf853..e71b3cc 100644
--- a/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch-logfeeder/pom.xml
@@ -35,6 +35,8 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<spring.version>5.1.2.RELEASE</spring.version>
<spring-boot.version>2.1.0.RELEASE</spring-boot.version>
+ <aws-sdk.version>1.11.445</aws-sdk.version>
+ <gcs-connector.version>hadoop3-1.9.10</gcs-connector.version>
</properties>
<dependencies>
@@ -122,7 +124,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>25.0-jre</version>
+ <version>27.0-jre</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -145,9 +147,34 @@
<version>${ambari-metrics.version}</version>
</dependency>
<dependency>
- <groupId>io.minio</groupId>
- <artifactId>minio</artifactId>
- <version>5.0.1</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-core</artifactId>
+ <version>${aws-sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws-sdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure-datalake</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.cloud.bigdataoss</groupId>
+ <artifactId>gcs-connector</artifactId>
+ <version>${gcs-connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -253,13 +280,12 @@
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
- <!-- Exclude jars globally-->
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
- <version>1.7.0</version>
- <scope>provided</scope>
+ <version>1.9.3</version>
</dependency>
+ <!-- Exclude jars globally-->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a9790b2..f528c45 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -110,5 +110,40 @@ public class LogFeederConstants {
public static final String SOLR_URLS = "logfeeder.solr.urls";
public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
+ public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
+ public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
+ public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
+ public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
+ public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client";
+
+ public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_MIN = "logfeeder.cloud.rollover.threshold.min";
+ public static final String CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE = "logfeeder.cloud.rollover.threshold.size";
+ public static final String CLOUD_ROLLOVER_USE_GZIP = "logfeeder.cloud.rollover.use.gzip";
+ public static final String CLOUD_ROLLOVER_IMMEDIATE_FLUSH = "logfeeder.cloud.rollover.immediate.flush";
+ public static final String CLOUD_ROLLOVER_ON_SHUTDOWN = "logfeeder.cloud.rollover.on.shutdown";
+ public static final String CLOUD_ROLLOVER_ON_STARTUP = "logfeeder.cloud.rollover.on.startup";
+
+ public static final String HDFS_HOST = "logfeeder.hdfs.host";
+ public static final String HDFS_PORT = "logfeeder.hdfs.port";
+ public static final String HDFS_USER = "logfeeder.hdfs.user";
+ public static final String HDFS_OUTPUT_BASE_DIR = "logfeeder.hdfs.output.base.dir";
+ public static final String HDFS_FILE_PERMISSIONS = "logfeeder.hdfs.file.permissions";
+ public static final String HDFS_KERBEROS = "logfeeder.hdfs.kerberos";
+
+ public static final String S3_ENDPOINT = "logfeeder.s3.endpoint";
+ public static final String S3_ENDPOINT_DEFAULT = "https://s3.amazonaws.com";
+ public static final String S3_REGION = "logfeeder.s3.region";
+ public static final String S3_BUCKET = "logfeeder.s3.bucket";
+ public static final String S3_OBJECT_ACL = "logfeeder.s3.object.acl";
+ public static final String S3_PATH_STYLE_ACCESS = "logfeeder.s3.path.style.access";
+ public static final String S3_MULTIOBJECT_DELETE_ENABLE = "logfeeder.s3.multiobject.delete.enable";
+ public static final String S3_SECRET_KEY = "logfeeder.s3.secret.key";
+ public static final String S3_ACCESS_KEY = "logfeeder.s3.access.key";
+ public static final String S3_SECRET_KEY_FILE = "logfeeder.s3.secret.key.file";
+ public static final String S3_ACCESS_KEY_FILE = "logfeeder.s3.access.key.file";
+ public static final String S3_USE_FILE = "logfeeder.s3.credentials.file.enabled";
+ public static final String S3_USE_HADOOP_CREDENTIAL_PROVIDER = "logfeeder.s3.credentials.hadoop.enabled";
+ public static final String S3_HADOOP_CREDENTIAL_SECRET_REF = "logfeeder.s3.credentials.hadoop.secret.ref";
+ public static final String S3_HADOOP_CREDENTIAL_ACCESS_REF = "logfeeder.s3.credentials.hadoop.access.ref";
}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
similarity index 52%
rename from ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
index 51c34a2..1a7eafa 100644
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/CloudStorageDestination.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -16,29 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.util;
+package org.apache.ambari.logfeeder.conf;
-import static org.junit.Assert.assertEquals;
+public enum CloudStorageDestination {
+ HDFS("hdfs"), S3("s3"), GCS("gcs"), ADLS("adls"), NONE("none");
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.junit.Test;
+ private String text;
-public class S3UtilTest {
-
- @Test
- public void testS3Util_pathToBucketName() throws Exception {
- String s3Path = "s3://bucket_name/path/file.txt";
- String expectedBucketName = "bucket_name";
- String actualBucketName = S3Util.getBucketName(s3Path);
- assertEquals(expectedBucketName, actualBucketName);
+ CloudStorageDestination(String text) {
+ this.text = text;
}
- @Test
- public void testS3Util_pathToS3Key() throws Exception {
- String s3Path = "s3://bucket_name/path/file.txt";
- String expectedS3key = "path/file.txt";
- String actualS3key = S3Util.getS3Key(s3Path);
- assertEquals(expectedS3key, actualS3key);
+ public String getText() {
+ return this.text;
}
-
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index dc1bfd2..e89f7f4 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,6 +19,9 @@
package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.apache.commons.lang.StringUtils;
@@ -43,6 +46,15 @@ public class LogFeederProps implements LogFeederProperties {
@Inject
private Environment env;
+ @Inject
+ private HdfsOutputConfig hdfsOutputConfig;
+
+ @Inject
+ private S3OutputConfig s3OutputConfig;
+
+ @Inject
+ private RolloverConfig rolloverConfig;
+
private Properties properties;
@LogSearchPropertyDescription(
@@ -209,6 +221,45 @@ public class LogFeederProps implements LogFeederProperties {
@Value("${" + LogFeederConstants.CLOUD_STORAGE_MODE + ":default}")
public LogFeederMode cloudStorageMode;
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_DESTINATION,
+ description = "Type of storage that is the destination for cloud output logs.",
+ examples = {"hdfs", "s3", "gcs", "adls", "none"},
+ sources = {LogFeederConstants.CLOUD_STORAGE_DESTINATION}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_DESTINATION + ":none}")
+ private CloudStorageDestination cloudStorageDestination;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN,
+ description = "Try to upload archived files on shutdown",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN + ":false}")
+ public boolean cloudStorageUploadOnShutdown;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS,
+ description = "Second interval, that is used to check against there are any files to upload to cloud storage or not.",
+ examples = {"10"},
+ defaultValue = "60",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS + ":60}")
+ public Integer cloudStorageUploaderIntervalSeconds;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
+ description = "Use hdfs client with cloud connectors instead of the core clients for shipping data to cloud storage",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT + ":false}")
+ public boolean useCloudHdfsClient;
+
@Inject
private LogEntryCacheConfig logEntryCacheConfig;
@@ -370,6 +421,62 @@ public class LogFeederProps implements LogFeederProperties {
this.cloudStorageMode = cloudStorageMode;
}
+ public HdfsOutputConfig getHdfsOutputConfig() {
+ return hdfsOutputConfig;
+ }
+
+ public S3OutputConfig getS3OutputConfig() {
+ return s3OutputConfig;
+ }
+
+ public void setS3OutputConfig(S3OutputConfig s3OutputConfig) {
+ this.s3OutputConfig = s3OutputConfig;
+ }
+
+ public RolloverConfig getRolloverConfig() {
+ return rolloverConfig;
+ }
+
+ public void setRolloverConfig(RolloverConfig rolloverConfig) {
+ this.rolloverConfig = rolloverConfig;
+ }
+
+ public void setHdfsOutputConfig(HdfsOutputConfig hdfsOutputConfig) {
+ this.hdfsOutputConfig = hdfsOutputConfig;
+ }
+
+ public CloudStorageDestination getCloudStorageDestination() {
+ return cloudStorageDestination;
+ }
+
+ public void setCloudStorageDestination(CloudStorageDestination cloudStorageDestination) {
+ this.cloudStorageDestination = cloudStorageDestination;
+ }
+
+ public boolean isCloudStorageUploadOnShutdown() {
+ return cloudStorageUploadOnShutdown;
+ }
+
+ public void setCloudStorageUploadOnShutdown(boolean cloudStorageUploadOnShutdown) {
+ this.cloudStorageUploadOnShutdown = cloudStorageUploadOnShutdown;
+ }
+
+ public Integer getCloudStorageUploaderIntervalSeconds() {
+ return cloudStorageUploaderIntervalSeconds;
+ }
+
+ public void setCloudStorageUploaderIntervalSeconds(Integer cloudStorageUploaderIntervalSeconds) {
+ this.cloudStorageUploaderIntervalSeconds = cloudStorageUploaderIntervalSeconds;
+ }
+
+ public boolean isUseCloudHdfsClient() {
+ return useCloudHdfsClient;
+ }
+
+ public void setUseCloudHdfsClient(boolean useCloudHdfsClient) {
+ this.useCloudHdfsClient = useCloudHdfsClient;
+ }
+
public String[] getSolrUrls() {
if (StringUtils.isNotBlank(this.solrUrlsStr)) {
return this.solrUrlsStr.split(",");
@@ -392,5 +499,4 @@ public class LogFeederProps implements LogFeederProperties {
throw new IllegalArgumentException("Cannot find logfeeder.properties on the classpath");
}
}
-
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
new file mode 100644
index 0000000..7432cdd
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/BucketConfig.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class BucketConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP,
+ description = "Create bucket on startup.",
+ examples = {"false"},
+ defaultValue = "true",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_STORAGE_BUCKET_BOOTSTRAP + ":false}")
+ private boolean createBucketOnStartup;
+
+ public boolean isCreateBucketOnStartup() {
+ return createBucketOnStartup;
+ }
+
+ public void setCreateBucketOnStartup(boolean createBucketOnStartup) {
+ this.createBucketOnStartup = createBucketOnStartup;
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
similarity index 58%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
index 871ae93..6603a5d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/CloudStorageOutputConfig.java
@@ -16,17 +16,32 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.output.cloud;
+package org.apache.ambari.logfeeder.conf.output;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+public interface CloudStorageOutputConfig extends OutputConfig {
+ String getSecretKey();
-/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
- */
-public class CloudStorageFactory {
+ String getAccessKey();
+
+ String getSecretKeyFileLocation();
+
+ String getAccessKeyFileLocation();
+
+ boolean isUseFileSecrets();
+
+ boolean isUseHadoopCredentialStorage();
+
+ String getSecretKeyHadoopCredentialReference();
+
+ String getAccessKeyHadoopCredentialReference();
+
+ String getAccessKeyProperty();
+
+ String getSecretKeyProperty();
+
+ String getAccessKeyEnvVariable();
+
+ String getSecretKeyEnvVariable();
- public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
- return new HDFSOutput();
- }
+ String getDescription();
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
new file mode 100644
index 0000000..e81ce7a
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/HdfsOutputConfig.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class HdfsOutputConfig implements OutputConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_HOST,
+ description = "HDFS Name Node host.",
+ examples = {"mynamenodehost"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_HOST + ":}")
+ private String hdfsHost;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_PORT,
+ description = "HDFS Name Node port",
+ examples = {"9000"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_PORT + ":}")
+ private Integer hdfsPort;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_FILE_PERMISSIONS,
+ description = "Default permissions for created files on HDFS",
+ examples = {"600"},
+ defaultValue = "640",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_FILE_PERMISSIONS + ":640}")
+ private String hdfsFilePermissions;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_USER,
+ description = "Overrides HADOOP_USER_NAME variable at runtime",
+ examples = {"hdfs"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
+ private String hdfsUser;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_OUTPUT_BASE_DIR,
+ description = "HDFS base directory for uploading files",
+ examples = {"/my/path/on/hdfs"},
+ defaultValue = "/user/hdfs",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_OUTPUT_BASE_DIR + ":/user/hdfs}")
+ private String hdfsOutputDir;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.HDFS_KERBEROS,
+ description = "Enable kerberos support for HDFS",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.HDFS_KERBEROS + ":false}")
+ private boolean secure;
+
+ public String getHdfsHost() {
+ return hdfsHost;
+ }
+
+ public void setHdfsHost(String hdfsHost) {
+ this.hdfsHost = hdfsHost;
+ }
+
+ public Integer getHdfsPort() {
+ return hdfsPort;
+ }
+
+ public void setHdfsPort(Integer hdfsPort) {
+ this.hdfsPort = hdfsPort;
+ }
+
+ public String getHdfsFilePermissions() {
+ return hdfsFilePermissions;
+ }
+
+ public void setHdfsFilePermissions(String hdfsFilePermissions) {
+ this.hdfsFilePermissions = hdfsFilePermissions;
+ }
+
+ public String getHdfsUser() {
+ return hdfsUser;
+ }
+
+ public void setHdfsUser(String hdfsUser) {
+ this.hdfsUser = hdfsUser;
+ }
+
+ public String getHdfsOutputDir() {
+ return hdfsOutputDir;
+ }
+
+ public void setHdfsOutputDir(String hdfsOutputDir) {
+ this.hdfsOutputDir = hdfsOutputDir;
+ }
+
+ public boolean isSecure() {
+ return secure;
+ }
+
+ public void setSecure(boolean secure) {
+ this.secure = secure;
+ }
+
+ @Override
+ public String getOutputBasePath() {
+ return this.hdfsOutputDir;
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
similarity index 68%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
index 871ae93..0b99e03 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/OutputConfig.java
@@ -16,17 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+package org.apache.ambari.logfeeder.conf.output;
/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
+ * Output config marker interface
*/
-public class CloudStorageFactory {
+public interface OutputConfig {
+
+ /**
+ * Holds output destiation for a storage, it can act as a base output directory or a bucket
+ * @return output directory or output bucket
+ */
+ String getOutputBasePath();
- public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
- return new HDFSOutput();
- }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
new file mode 100644
index 0000000..d447838
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/RolloverConfig.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class RolloverConfig {
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_MIN,
+ description = "Rollover cloud log files after an interval (minutes)",
+ examples = {"1"},
+ defaultValue = "60",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_MIN + ":60}")
+ private int rolloverThresholdTimeMins;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE,
+ description = "Rollover cloud log files after the log file size reach this limit",
+ examples = {"1024KB"},
+ defaultValue = "80MB",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_THRESHOLD_TIME_SIZE + ":80MB}")
+ private String rolloverSize;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_USE_GZIP,
+ description = "Use GZip on archived logs.",
+ examples = {"false"},
+ defaultValue = "true",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_USE_GZIP + ":true}")
+ private boolean useGzip;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_IMMEDIATE_FLUSH,
+ description = "Immediately flush cloud logs (to active location).",
+ examples = {"false"},
+ defaultValue = "true",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_IMMEDIATE_FLUSH + ":false}")
+ private boolean immediateFlush;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_ON_SHUTDOWN,
+ description = "Rollover log files on shutdown",
+ examples = {"false"},
+ defaultValue = "true",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_ON_SHUTDOWN + ":false}")
+ private boolean rolloverOnShutdown;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_ROLLOVER_ON_STARTUP,
+ description = "Rollover log files on startup",
+ examples = {"false"},
+ defaultValue = "true",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.CLOUD_ROLLOVER_ON_STARTUP + ":false}")
+ private boolean rolloverOnStartup;
+
+ public int getRolloverThresholdTimeMins() {
+ return rolloverThresholdTimeMins;
+ }
+
+ public void setRolloverThresholdTimeMins(int rolloverThresholdTimeMins) {
+ this.rolloverThresholdTimeMins = rolloverThresholdTimeMins;
+ }
+
+ public String getRolloverSize() {
+ return rolloverSize;
+ }
+
+ public void setRolloverSize(String rolloverSize) {
+ this.rolloverSize = rolloverSize;
+ }
+
+ public boolean isUseGzip() {
+ return useGzip;
+ }
+
+ public void setUseGzip(boolean useGzip) {
+ this.useGzip = useGzip;
+ }
+
+ public boolean isImmediateFlush() {
+ return immediateFlush;
+ }
+
+ public void setImmediateFlush(boolean immediateFlush) {
+ this.immediateFlush = immediateFlush;
+ }
+
+ public boolean isRolloverOnShutdown() {
+ return rolloverOnShutdown;
+ }
+
+ public void setRolloverOnShutdown(boolean rolloverOnShutdown) {
+ this.rolloverOnShutdown = rolloverOnShutdown;
+ }
+
+ public boolean isRolloverOnStartup() {
+ return rolloverOnStartup;
+ }
+
+ public void setRolloverOnStartup(boolean rolloverOnStartup) {
+ this.rolloverOnStartup = rolloverOnStartup;
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
new file mode 100644
index 0000000..2da2698
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/output/S3OutputConfig.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.conf.output;
+
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+
+import javax.inject.Inject;
+
+@Configuration
+public class S3OutputConfig implements CloudStorageOutputConfig {
+
+ private final BucketConfig bucketConfig;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_ENDPOINT,
+ description = "Amazon S3 endpoint.",
+ examples = {"https://s3.amazonaws.com"},
+ defaultValue = LogFeederConstants.S3_ENDPOINT_DEFAULT,
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_ENDPOINT + ":" + LogFeederConstants.S3_ENDPOINT_DEFAULT +"}")
+ private String endpoint;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_SECRET_KEY,
+ description = "Amazon S3 secret key.",
+ examples = {"MySecretKey"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_SECRET_KEY + ":}")
+ private String secretKey;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_ACCESS_KEY,
+ description = "Amazon S3 secret access key.",
+ examples = {"MySecretAccessKey"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_ACCESS_KEY + ":}")
+ private String accessKey;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_SECRET_KEY_FILE,
+ description = "Amazon S3 secret key file (that contains only the key).",
+ examples = {"/my/path/secret_key"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_ACCESS_KEY + ":}")
+ private String secretKeyFileLocation;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_ACCESS_KEY_FILE,
+ description = "Amazon S3 secret access key file (that contains only the key).",
+ examples = {"/my/path/access_key"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_ACCESS_KEY_FILE + ":}")
+ private String accessKeyFileLocation;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_USE_FILE,
+ description = "Enable to get Amazon S3 secret/access keys from files.",
+ examples = {"true"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_USE_FILE + ":false}")
+ private boolean useFileSecrets;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_USE_HADOOP_CREDENTIAL_PROVIDER,
+ description = "Enable to get Amazon S3 secret/access keys from Hadoop credential store API.",
+ examples = {"true"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_USE_HADOOP_CREDENTIAL_PROVIDER + ":false}")
+ private boolean useHadoopCredentialStorage;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_HADOOP_CREDENTIAL_SECRET_REF,
+ description = "Amazon S3 secret access key reference in Hadoop credential store..",
+ examples = {"logfeeder.s3.secret.key"},
+ defaultValue = "logfeeder.s3.secret.key",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_HADOOP_CREDENTIAL_SECRET_REF + ":logfeeder.s3.secret.key}")
+ private String secretKeyHadoopCredentialReference;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_HADOOP_CREDENTIAL_ACCESS_REF,
+ description = "Amazon S3 access key reference in Hadoop credential store..",
+ examples = {"logfeeder.s3.access.key"},
+ defaultValue = "logfeeder.s3.access.key",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_HADOOP_CREDENTIAL_ACCESS_REF + ":logfeeder.s3.access.key}")
+ private String accessKeyHadoopCredentialReference;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_REGION,
+ description = "Amazon S3 region.",
+ examples = {"us-east-2"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_REGION + ":us-east-2}")
+ private String region;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_BUCKET,
+ description = "Amazon S3 bucket.",
+ examples = {"logs"},
+ defaultValue = "logfeeder",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_BUCKET + ":logfeeder}")
+ private String bucket;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_OBJECT_ACL,
+ description = "Amazon S3 ACLs for new objects.",
+ examples = {"logs"},
+ defaultValue = "private",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_OBJECT_ACL + ":private}")
+ private String objectAcl;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_PATH_STYLE_ACCESS,
+ description = "Enable S3 path style access will disable the default virtual hosting behaviour (DNS).",
+ defaultValue = "false",
+ examples = {"true"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_PATH_STYLE_ACCESS + ":false}")
+ private boolean pathStyleAccess;
+
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.S3_MULTIOBJECT_DELETE_ENABLE,
+ description = "When enabled, multiple single-object delete requests are replaced by\n" +
+ " a single 'delete multiple objects'-request, reducing the number of requests.",
+ defaultValue = "true",
+ examples = {"false"},
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${"+ LogFeederConstants.S3_MULTIOBJECT_DELETE_ENABLE + ":true}")
+ private boolean multiobjectDeleteEnable;
+
+ @Inject
+ public S3OutputConfig(BucketConfig bucketConfig) {
+ this.bucketConfig = bucketConfig;
+ }
+
+ @Override
+ public String getOutputBasePath() {
+ return this.bucket;
+ }
+
+ public String getBucket() {
+ return this.bucket;
+ }
+
+ public void setBucket(String bucketName) {
+ this.bucket = bucketName;
+ }
+
+ public String getEndpoint() {
+ return endpoint;
+ }
+
+ public void setEndpoint(String endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ public String getRegion() {
+ return region;
+ }
+
+ public void setRegion(String region) {
+ this.region = region;
+ }
+
+ public String getSecretKey() {
+ return secretKey;
+ }
+
+ public void setSecretKey(String secretKey) {
+ this.secretKey = secretKey;
+ }
+
+ public String getAccessKey() {
+ return accessKey;
+ }
+
+ public void setAccessKey(String accessKey) {
+ this.accessKey = accessKey;
+ }
+
+ public String getSecretKeyFileLocation() {
+ return secretKeyFileLocation;
+ }
+
+ public void setSecretKeyFileLocation(String secretKeyFileLocation) {
+ this.secretKeyFileLocation = secretKeyFileLocation;
+ }
+
+ public String getAccessKeyFileLocation() {
+ return accessKeyFileLocation;
+ }
+
+ public void setAccessKeyFileLocation(String accessKeyFileLocation) {
+ this.accessKeyFileLocation = accessKeyFileLocation;
+ }
+
+ public boolean isUseFileSecrets() {
+ return useFileSecrets;
+ }
+
+ public void setUseFileSecrets(boolean useFileSecrets) {
+ this.useFileSecrets = useFileSecrets;
+ }
+
+ public boolean isUseHadoopCredentialStorage() {
+ return useHadoopCredentialStorage;
+ }
+
+ public void setUseHadoopCredentialStorage(boolean useHadoopCredentialStorage) {
+ this.useHadoopCredentialStorage = useHadoopCredentialStorage;
+ }
+
+ public String getSecretKeyHadoopCredentialReference() {
+ return secretKeyHadoopCredentialReference;
+ }
+
+ public void setSecretKeyHadoopCredentialReference(String secretKeyHadoopCredentialReference) {
+ this.secretKeyHadoopCredentialReference = secretKeyHadoopCredentialReference;
+ }
+
+ public String getAccessKeyHadoopCredentialReference() {
+ return accessKeyHadoopCredentialReference;
+ }
+
+ @Override
+ public String getAccessKeyProperty() {
+ return LogFeederConstants.S3_ACCESS_KEY;
+ }
+
+ @Override
+ public String getSecretKeyProperty() {
+ return LogFeederConstants.S3_SECRET_KEY;
+ }
+
+ @Override
+ public String getAccessKeyEnvVariable() {
+ return "AWS_ACCESS_KEY_ID";
+ }
+
+ @Override
+ public String getSecretKeyEnvVariable() {
+ return "AWS_SECRET_ACCESS_KEY";
+ }
+
+ @Override
+ public String getDescription() {
+ return "AWS S3";
+ }
+
+ public void setAccessKeyHadoopCredentialReference(String accessKeyHadoopCredentialReference) {
+ this.accessKeyHadoopCredentialReference = accessKeyHadoopCredentialReference;
+ }
+
+ public String getObjectAcl() {
+ return objectAcl;
+ }
+
+ public void setObjectAcl(String objectAcl) {
+ this.objectAcl = objectAcl;
+ }
+
+ public boolean isPathStyleAccess() {
+ return pathStyleAccess;
+ }
+
+ public void setPathStyleAccess(boolean pathStyleAccess) {
+ this.pathStyleAccess = pathStyleAccess;
+ }
+
+ public boolean isMultiobjectDeleteEnable() {
+ return multiobjectDeleteEnable;
+ }
+
+ public void setMultiobjectDeleteEnable(boolean multiobjectDeleteEnable) {
+ this.multiobjectDeleteEnable = multiobjectDeleteEnable;
+ }
+
+ public BucketConfig getBucketConfig() {
+ return bucketConfig;
+ }
+
+ public CannedAccessControlList calculateAcls(String aclStr) {
+ for (CannedAccessControlList val : CannedAccessControlList.values()) {
+ if (val.toString().equals(aclStr)) {
+ return val;
+ }
+ }
+ throw new IllegalArgumentException(String.format("'%s' is not a valid ACL setting", aclStr));
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
similarity index 69%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
index 871ae93..abc462e 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/credential/PlainTextSecretStore.java
@@ -16,17 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+package org.apache.ambari.logfeeder.credential;
/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
+ * Plain text secret store is not recommended for production
*/
-public class CloudStorageFactory {
+public class PlainTextSecretStore implements SecretStore {
+
+ private final String secret;
+
+ public PlainTextSecretStore(String secret) {
+ this.secret = secret;
+ }
- public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
- return new HDFSOutput();
+ @Override
+ public char[] getSecret() {
+ return secret.toCharArray();
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
deleted file mode 100644
index 5191045..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.input;
-
-import org.apache.ambari.logfeeder.output.S3OutputConfiguration;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
-import org.apache.commons.lang.ArrayUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-import java.io.BufferedReader;
-import java.io.File;
-
-/**
- * Download file from S3 then start processing it.
- */
-public class InputS3File extends InputFile {
-
- private static final Logger logger = LogManager.getLogger(InputS3File.class);
-
- private boolean ready = false;
-
- @Override
- public boolean isReady() {
- if (!ready) {
- // Let's try to check whether the file is available
- setLogFiles(getActualFiles(getLogPath()));
- if (!ArrayUtils.isEmpty(getLogFiles())) {
- if (isTail() && getLogFiles().length > 1) {
- logger.warn("Found multiple files (" + getLogFiles().length + ") for the file filter " + getFilePath() +
- ". Will use only the first one. Using " + getLogFiles()[0].getAbsolutePath());
- }
- logger.info("File filter " + getFilePath() + " expanded to " + getLogFiles()[0].getAbsolutePath());
- setReady(true);
- } else {
- logger.debug(getLogPath() + " file doesn't exist. Ignoring for now");
- }
- }
- return ready;
- }
-
- @Override
- public void setReady(boolean ready) {
- this.ready = ready;
- }
-
- private File[] getActualFiles(String searchPath) {
- // TODO search file on s3
- return new File[] { new File(searchPath) };
- }
-
- @Override
- public void start() throws Exception {
- if (ArrayUtils.isEmpty(getLogFiles())) {
- return;
- }
- for (int i = getLogFiles().length - 1; i >= 0; i--) {
- File file = getLogFiles()[i];
- if (i == 0 || !isTail()) {
- try {
- processFile(file, i == 0);
- if (isClosed() || isDrain()) {
- logger.info("isClosed or isDrain. Now breaking loop.");
- break;
- }
- } catch (Throwable t) {
- logger.error("Error processing file=" + file.getAbsolutePath(), t);
- }
- }
- }
- close();
- }
-
- @Override
- public BufferedReader openLogFile(File logPathFile) throws Exception {
- String s3AccessKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey();
- String s3SecretKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey();
- String s3Endpoint = ((InputS3FileDescriptor)getInputDescriptor()).getS3Endpoint();
- if (s3Endpoint == null) {
- s3Endpoint = S3OutputConfiguration.DEFAULT_S3_ENDPOINT;
- }
- BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3Endpoint, s3AccessKey, s3SecretKey);
- Object fileKey = getFileKey(logPathFile);
- setFileKey(fileKey);
- String base64FileKey = Base64.byteArrayToBase64(getFileKey().toString().getBytes());
- setBase64FileKey(base64FileKey);
- logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
- return br;
- }
-
- private Object getFileKey(File logFile) {
- return logFile.getPath();
- }
-
- @Override
- public void close() {
- super.close();
- setClosed(true);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
deleted file mode 100644
index 93a2643..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ /dev/null
@@ -1,269 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputFileMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
-import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.ambari.logfeeder.plugin.output.Output;
-import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * An {@link Output} that records logs to HDFS.
- *
- * The events are spooled on the local file system and uploaded in batches asynchronously.
- */
-public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
- private static final Logger logger = LogManager.getLogger(OutputHDFSFile.class);
-
- private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
-
- private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
-
- private final Object readyMonitor = new Object();
-
- private Thread hdfsCopyThread = null;
-
- private String filenamePrefix = "service-logs-";
- private long rolloverThresholdTimeMillis;
-
- private String hdfsOutDir = null;
- private String hdfsHost = null;
- private String hdfsPort = null;
- private FileSystem fileSystem = null;
-
- private LogSpooler logSpooler;
-
- private LogFeederProps logFeederProps;
-
- @Override
- public void init(LogFeederProps logFeederProps) throws Exception {
- this.logFeederProps = logFeederProps;
- hdfsOutDir = getStringValue("hdfs_out_dir");
- hdfsHost = getStringValue("hdfs_host");
- hdfsPort = getStringValue("hdfs_port");
- long rolloverThresholdTimeSeconds = getLongValue("rollover_sec", DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS);
- rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
- filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
- if (StringUtils.isEmpty(hdfsOutDir)) {
- logger.error("HDFS config property <hdfs_out_dir> is not set in config file.");
- return;
- }
- if (StringUtils.isEmpty(hdfsHost)) {
- logger.error("HDFS config property <hdfs_host> is not set in config file.");
- return;
- }
- if (StringUtils.isEmpty(hdfsPort)) {
- logger.error("HDFS config property <hdfs_port> is not set in config file.");
- return;
- }
- HashMap<String, String> contextParam = buildContextParam();
- hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
- logger.info("hdfs Output dir=" + hdfsOutDir);
- String localFileDir = logFeederProps.getTmpDir() + "hdfs/service/";
- logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
- this.startHDFSCopyThread();
- }
-
- @Override
- public void close() {
- logger.info("Closing file." + getShortDescription());
- logSpooler.rollover();
- this.stopHDFSCopyThread();
- shouldCloseOutput();
- }
-
- @Override
- public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
- if (block != null) {
- logSpooler.add(block);
- statMetric.value++;
- }
- }
-
- @Override
- public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
- String block = LogFeederUtil.getGson().toJson(jsonObj);
- write(block, inputMarker);
- }
-
-
- @Override
- public String getShortDescription() {
- return "output:destination=hdfs,hdfsOutDir=" + hdfsOutDir;
- }
-
- private void startHDFSCopyThread() {
-
- hdfsCopyThread = new Thread("hdfsCopyThread") {
- @Override
- public void run() {
- try {
- while (true) {
- Iterator<File> localFileIterator = localReadyFiles.iterator();
- while (localFileIterator.hasNext()) {
- File localFile = localFileIterator.next();
- fileSystem = LogFeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
- if (fileSystem != null && localFile.exists()) {
- String destFilePath = hdfsOutDir + "/" + localFile.getName();
- String localPath = localFile.getAbsolutePath();
- boolean overWrite = true;
- boolean delSrc = true;
- boolean isCopied = LogFeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
- overWrite, delSrc);
- if (isCopied) {
- logger.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
- } else {
- // TODO Need to write retry logic, in next release we can handle it
- logger.error("Hdfs file copy failed for hdfspath :" + destFilePath + " and localpath :" + localPath);
- }
- }
- localFileIterator.remove();
- }
- try {
- // wait till new file comes in reayList
- synchronized (readyMonitor) {
- if (localReadyFiles.isEmpty()) {
- readyMonitor.wait();
- }
- }
- } catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage(),e);
- }
- }
- } catch (Exception e) {
- logger.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e);
- }
- }
- };
- hdfsCopyThread.setDaemon(true);
- hdfsCopyThread.start();
- }
-
- private void stopHDFSCopyThread() {
- if (hdfsCopyThread != null) {
- logger.info("waiting till copy all local files to hdfs.......");
- while (!localReadyFiles.isEmpty()) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage(), e);
- }
- logger.debug("still waiting to copy all local files to hdfs.......");
- }
- logger.info("calling interrupt method for hdfsCopyThread to stop it.");
- try {
- hdfsCopyThread.interrupt();
- } catch (SecurityException exception) {
- logger.error(" Current thread : '" + Thread.currentThread().getName() +
- "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
- }
- LogFeederHDFSUtil.closeFileSystem(fileSystem);
- }
- }
-
- private HashMap<String, String> buildContextParam() {
- HashMap<String, String> contextParam = new HashMap<String, String>();
- contextParam.put("host", LogFeederUtil.hostName);
- return contextParam;
- }
-
- private void addFileInReadyList(File localFile) {
- localReadyFiles.add(localFile);
- try {
- synchronized (readyMonitor) {
- readyMonitor.notifyAll();
- }
- } catch (Exception e) {
- logger.error(e.getLocalizedMessage(),e);
- }
- }
-
- @Override
- public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
- throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
- }
-
- /**
- * Add the rollover file to a daemon thread for uploading to HDFS
- * @param rolloverFile the file to be uploaded to HDFS
- */
- @Override
- public void handleRollover(File rolloverFile) {
- addFileInReadyList(rolloverFile);
- }
-
- /**
- * Determines whether it is time to handleRollover the current spool file.
- *
- * The file will handleRollover if the time since creation of the file is more than
- * the timeout specified in rollover_sec configuration.
- * @param currentSpoolerContext {@link LogSpoolerContext} that holds state of active Spool file
- * @return true if time since creation is greater than value specified in rollover_sec,
- * false otherwise.
- */
- @Override
- public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
- long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
- boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
- if (shouldRollover) {
- logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
- " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
- }
- return shouldRollover;
- }
-
- @Override
- public String getOutputType() {
- throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
- }
-
- @Override
- public Long getPendingCount() {
- return 0L;
- }
-
- @Override
- public String getWriteBytesMetricName() {
- return "output.hdfs.write_bytes";
- }
-
- @Override
- public String getStatMetricName() {
- return "output.hdfs.write_logs";
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
deleted file mode 100644
index a2f6b08..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputFile;
-import org.apache.ambari.logfeeder.input.InputFileMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
-import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.plugin.filter.Filter;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigGson;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputConfigImpl;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.ambari.logsearch.config.json.model.inputconfig.impl.InputS3FileDescriptorImpl;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Write log file into s3 bucket.
- *
- * This class supports two modes of upload:
- * <ul>
- * <li>A one time upload of files matching a pattern</li>
- * <li>A batch mode, asynchronous, periodic upload of files</li>
- * </ul>
- */
-public class OutputS3File extends OutputFile implements RolloverCondition, RolloverHandler {
- private static final Logger logger = LogManager.getLogger(OutputS3File.class);
-
- public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
-
- private LogSpooler logSpooler;
- private S3OutputConfiguration s3OutputConfiguration;
- private S3Uploader s3Uploader;
- private LogFeederProps logFeederProps;
-
- @Override
- public void init(LogFeederProps logFeederProps) throws Exception {
- this.logFeederProps = logFeederProps;
- s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
- }
-
- private static boolean uploadedGlobalConfig = false;
-
- /**
- * Copy local log files and corresponding config to S3 bucket one time.
- * @param inputFile The file to be copied
- * @param inputMarker Contains information about the configuration to be uploaded.
- */
- @Override
- public void copyFile(File inputFile, InputMarker inputMarker) {
- String type = inputMarker.getInput().getInputDescriptor().getType();
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
- String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.getInput().getInputDescriptor().getType());
-
- uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
- }
-
- private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
- String resolvedPath) {
-
- ArrayList<FilterDescriptor> filters = new ArrayList<>();
- addFilters(filters, inputMarker.getInput().getFirstFilter());
- InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.getInput().getInputDescriptor();
- InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
- InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
- String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
- LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
- inputS3FileDescriptor.setPath(s3CompletePath);
-
- ArrayList<InputDescriptorImpl> inputConfigList = new ArrayList<>();
- inputConfigList.add(inputS3FileDescriptor);
- // set source s3_file
- // remove global config from input config
- removeS3GlobalConfig(inputS3FileDescriptor);
- // write config into s3 file
- InputConfigImpl inputConfig = new InputConfigImpl();
- inputConfig.setInput(inputConfigList);
-
- writeConfigToS3(inputConfig, getComponentConfigFileName(type), s3OutputConfiguration);
- // write global config
- writeGlobalConfig(s3OutputConfiguration);
- }
-
- private void addFilters(ArrayList<FilterDescriptor> filters, Filter filter) {
- if (filter != null) {
- FilterDescriptor filterDescriptorOriginal = filter.getFilterDescriptor();
- FilterDescriptor filterDescriptor = InputConfigGson.gson.fromJson(
- InputConfigGson.gson.toJson(filterDescriptorOriginal), filterDescriptorOriginal.getClass());
- filters.add(filterDescriptor);
- if (filter.getNextFilter() != null) {
- addFilters(filters, filter.getNextFilter());
- }
- }
- }
-
- private void writeConfigToS3(Object config, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
- String configJson = InputConfigGson.gson.toJson(config);
-
- String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
- s3OutputConfiguration.getCluster());
-
- S3Util.writeDataIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
- s3OutputConfiguration.getS3Endpoint(), s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
- }
-
- private String getComponentConfigFileName(String componentName) {
- return "input.config-" + componentName + ".json";
- }
-
- private void removeS3GlobalConfig(InputS3FileDescriptorImpl inputS3FileDescriptor) {
- inputS3FileDescriptor.setSource(null);
- inputS3FileDescriptor.setCopyFile(null);
- inputS3FileDescriptor.setProcessFile(null);
- inputS3FileDescriptor.setTail(null);
- inputS3FileDescriptor.getAddFields().remove("ip");
- inputS3FileDescriptor.getAddFields().remove("host");
- inputS3FileDescriptor.getAddFields().remove("bundle_id");
- }
-
- /**
- * write global config in s3 file Invoke only once
- */
- @SuppressWarnings("unchecked")
- private synchronized void writeGlobalConfig(S3OutputConfiguration s3OutputConfiguration) {
- if (!uploadedGlobalConfig) {
- Map<String, Object> globalConfig = new HashMap<>();
- //updating global config before write to s3
- globalConfig.put("source", "s3_file");
- globalConfig.put("copy_file", false);
- globalConfig.put("process_file", true);
- globalConfig.put("tail", false);
- Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields");
- if (addFields == null) {
- addFields = new HashMap<>();
- }
- addFields.put("ip", LogFeederUtil.ipAddress);
- addFields.put("host", LogFeederUtil.hostName);
- // add bundle id same as cluster if its not there
- String bundle_id = (String) addFields.get("bundle_id");
- if (bundle_id == null || bundle_id.isEmpty()) {
- String cluster = (String) addFields.get("cluster");
- if (cluster != null && !cluster.isEmpty()) {
- addFields.put("bundle_id", bundle_id);
- }
- }
- globalConfig.put("add_fields", addFields);
- Map<String, Object> config = new HashMap<String, Object>();
- config.put("global", globalConfig);
- writeConfigToS3(config, GLOBAL_CONFIG_S3_PATH_SUFFIX, s3OutputConfiguration);
- uploadedGlobalConfig = true;
- }
- }
-
- /**
- * Write a log line to local file, to upload to S3 bucket asynchronously.
- *
- * This method uses a {@link LogSpooler} to spool the log lines to a local file.
-
- * @param block The log event to upload
- * @param inputMarker Contains information about the log file feeding the lines.
- */
- @Override
- public void write(String block, InputFileMarker inputMarker) {
- createLogSpoolerIfRequired(inputMarker);
- logSpooler.add(block);
- }
-
- @Override
- public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
- String block = LogFeederUtil.getGson().toJson(jsonObj);
- write(block, inputMarker);
- }
-
- private void createLogSpoolerIfRequired(InputFileMarker inputMarker) {
- if (logSpooler == null) {
- if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
- InputFile input = (InputFile) inputMarker.getInput();
- logSpooler = createSpooler(input.getFilePath());
- s3Uploader = createUploader(input.getInputDescriptor().getType());
- } else {
- logger.error("Cannot write from non local file...");
- }
- }
- }
-
- @VisibleForTesting
- protected S3Uploader createUploader(String logType) {
- S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType);
- uploader.startUploaderThread();
- return uploader;
- }
-
- @VisibleForTesting
- protected LogSpooler createSpooler(String filePath) {
- String spoolDirectory = logFeederProps.getTmpDir() + "/s3/service";
- logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
- return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
- s3OutputConfiguration.getRolloverTimeThresholdSecs());
- }
-
- /**
- * Check whether the locally spooled file should be rolled over, based on file size.
- *
- * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
- * for rollover.
- * @return true if sufficient size has been reached based on {@link S3OutputConfiguration#getRolloverSizeThresholdBytes()},
- * false otherwise
- */
- @Override
- public boolean shouldRollover(LogSpoolerContext currentSpoolerContext) {
- File spoolFile = currentSpoolerContext.getActiveSpoolFile();
- long currentSize = spoolFile.length();
- boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
- if (result) {
- logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
- s3OutputConfiguration.getRolloverSizeThresholdBytes()));
- }
- return result;
- }
-
- /**
- * Stops dependent objects that consume resources.
- */
- @Override
- public void close() {
- if (s3Uploader != null) {
- s3Uploader.stopUploaderThread();
- }
- if (logSpooler != null) {
- logSpooler.close();
- }
- }
-
- /**
- * Adds the locally spooled file to the {@link S3Uploader} to be uploaded asynchronously.
- *
- * @param rolloverFile The file that has been rolled over.
- */
- @Override
- public void handleRollover(File rolloverFile) {
- s3Uploader.addFileForUpload(rolloverFile.getAbsolutePath());
- }
-
- @Override
- public String getShortDescription() {
- return "output:destination=s3,bucket=" + s3OutputConfiguration.getS3BucketName();
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
deleted file mode 100644
index 8c544cf..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-
-import java.util.HashMap;
-
-/**
- * A utility class that resolves variables like hostname, IP address and cluster name in S3 paths.
- */
-public class S3LogPathResolver {
-
- /**
- * Construct a full S3 path by resolving variables in the path name including hostname, IP address
- * and cluster name
- * @param baseKeyPrefix The prefix which can contain the variables.
- * @param keySuffix The suffix appended to the prefix after variable expansion
- * @param cluster The name of the cluster
- * @return full S3 path.
- */
- public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
- HashMap<String, String> contextParam = buildContextParam(cluster);
- String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
- return resolvedKeyPrefix + LogFeederConstants.S3_PATH_SEPARATOR + keySuffix;
- }
-
- private HashMap<String, String> buildContextParam(String cluster) {
- HashMap<String, String> contextParam = new HashMap<>();
- contextParam.put("host", LogFeederUtil.hostName);
- contextParam.put("ip", LogFeederUtil.ipAddress);
- contextParam.put("cluster", cluster);
- return contextParam;
- }
-
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
deleted file mode 100644
index 293f011..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Holds all configuration relevant for S3 upload.
- */
-public class S3OutputConfiguration {
-
- public static final String SPOOL_DIR_KEY = "spool_dir";
- public static final String ROLLOVER_SIZE_THRESHOLD_BYTES_KEY = "rollover_size_threshold_bytes";
- public static final Long DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES = 10 * 1024 * 1024L;
- public static final String ROLLOVER_TIME_THRESHOLD_SECS_KEY = "rollover_time_threshold_secs";
- public static final Long DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS = 3600L;
- public static final String S3_BUCKET_NAME_KEY = "s3_bucket";
- public static final String S3_LOG_DIR_KEY = "s3_log_dir";
- public static final String S3_ACCESS_KEY = "s3_access_key";
- public static final String S3_SECRET_KEY = "s3_secret_key";
- public static final String S3_ENDPOINT = "s3_endpoint";
- public static final String DEFAULT_S3_ENDPOINT = "https://s3.amazonaws.com";
- public static final String COMPRESSION_ALGO_KEY = "compression_algo";
- public static final String ADDITIONAL_FIELDS_KEY = "add_fields";
- public static final String CLUSTER_KEY = "cluster";
-
- private Map<String, Object> configs;
-
- S3OutputConfiguration(Map<String, Object> configs) {
- this.configs = configs;
- }
-
- public String getS3BucketName() {
- return (String) configs.get(S3_BUCKET_NAME_KEY);
- }
-
- public String getS3Endpoint() {
- return (String) configs.getOrDefault(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
- }
-
- public String getS3Path() {
- return (String) configs.get(S3_LOG_DIR_KEY);
- }
-
- public String getS3AccessKey() {
- return (String) configs.get(S3_ACCESS_KEY);
- }
-
- public String getS3SecretKey() {
- return (String) configs.get(S3_SECRET_KEY);
- }
-
- public String getCompressionAlgo() {
- return (String) configs.get(COMPRESSION_ALGO_KEY);
- }
-
- public Long getRolloverSizeThresholdBytes() {
- return (Long) configs.get(ROLLOVER_SIZE_THRESHOLD_BYTES_KEY);
- }
-
- public Long getRolloverTimeThresholdSecs() {
- return (Long) configs.get(ROLLOVER_TIME_THRESHOLD_SECS_KEY);
- }
-
- @SuppressWarnings("unchecked")
- public String getCluster() {
- return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
- }
-
- public static S3OutputConfiguration fromConfigBlock(ConfigItem configItem) {
- Map<String, Object> configs = new HashMap<>();
- String[] stringValuedKeysToCopy = new String[] {
- SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
- S3_ACCESS_KEY, S3_SECRET_KEY, COMPRESSION_ALGO_KEY, S3_ENDPOINT
- };
-
- for (String key : stringValuedKeysToCopy) {
- String value = configItem.getStringValue(key);
- if (value != null) {
- configs.put(key, value);
- }
- }
-
- String[] longValuedKeysToCopy = new String[] {
- ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, ROLLOVER_TIME_THRESHOLD_SECS_KEY
- };
-
- Long[] defaultValuesForLongValuedKeys = new Long[] {
- DEFAULT_ROLLOVER_SIZE_THRESHOLD_BYTES, DEFAULT_ROLLOVER_TIME_THRESHOLD_SECS
- };
-
- for (int i = 0; i < longValuedKeysToCopy.length; i++) {
- configs.put(longValuedKeysToCopy[i], configItem.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
- }
-
- configs.put(ADDITIONAL_FIELDS_KEY, configItem.getNVList(ADDITIONAL_FIELDS_KEY));
-
- configs.putIfAbsent(S3_ENDPOINT, DEFAULT_S3_ENDPOINT);
-
- return new S3OutputConfiguration(configs);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
deleted file mode 100644
index 4273cc7..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.CompressionUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-import java.util.Date;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A class that handles the uploading of files to S3.
- *
- * This class can be used to upload a file one time, or start a daemon thread that can
- * be used to upload files added to a queue one after the other. When used to upload
- * files via a queue, one instance of this class is created for each file handled in
- * {@link org.apache.ambari.logfeeder.input.InputFile}.
- */
-public class S3Uploader implements Runnable {
- private static final Logger logger = LogManager.getLogger(S3Uploader.class);
-
- public static final String POISON_PILL = "POISON-PILL";
-
- private final S3OutputConfiguration s3OutputConfiguration;
- private final boolean deleteOnEnd;
- private final String logType;
- private final BlockingQueue<String> fileContextsToUpload;
- private final AtomicBoolean stopRunningThread = new AtomicBoolean(false);
-
- public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean deleteOnEnd, String logType) {
- this.s3OutputConfiguration = s3OutputConfiguration;
- this.deleteOnEnd = deleteOnEnd;
- this.logType = logType;
- this.fileContextsToUpload = new LinkedBlockingQueue<>();
- }
-
- /**
- * Starts a thread that can be used to upload files from a queue.
- *
- * Add files to be uploaded using the method {@link #addFileForUpload(String)}.
- * If this thread is started, it must be stopped using the method {@link #stopUploaderThread()}.
- */
- void startUploaderThread() {
- Thread s3UploaderThread = new Thread(this, "s3-uploader-thread-"+logType);
- s3UploaderThread.setDaemon(true);
- s3UploaderThread.start();
- }
-
- /**
- * Stops the thread used to upload files from a queue.
- *
- * This method must be called to cleanly free up resources, typically on shutdown of the process.
- * Note that this method does not drain any remaining files, and instead stops the thread
- * as soon as any file being currently uploaded is complete.
- */
- void stopUploaderThread() {
- stopRunningThread.set(true);
- boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
- if (!offerStatus) {
- logger.warn("Could not add poison pill to interrupt uploader thread.");
- }
- }
-
- /**
- * Add a file to a queue to upload asynchronously.
- * @param fileToUpload Full path to the local file which must be uploaded.
- */
- void addFileForUpload(String fileToUpload) {
- boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
- if (!offerStatus) {
- logger.error("Could not add file " + fileToUpload + " for upload.");
- }
- }
-
- @Override
- public void run() {
- while (!stopRunningThread.get()) {
- try {
- String fileNameToUpload = fileContextsToUpload.take();
- if (POISON_PILL.equals(fileNameToUpload)) {
- logger.warn("Found poison pill while waiting for files to upload, exiting");
- return;
- }
- uploadFile(new File(fileNameToUpload), logType);
- } catch (InterruptedException e) {
- logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
- return;
- }
- }
- }
-
- /**
- * Upload the given file to S3.
- *
- * The file which should be available locally, is first compressed using the compression
- * method specified by {@link S3OutputConfiguration#getCompressionAlgo()}. This compressed
- * file is what is uploaded to S3.
- * @param fileToUpload the file to upload
- * @param logType the name of the log which is used in the S3 path constructed.
- * @return
- */
- String uploadFile(File fileToUpload, String logType) {
- String bucketName = s3OutputConfiguration.getS3BucketName();
- String s3AccessKey = s3OutputConfiguration.getS3AccessKey();
- String s3SecretKey = s3OutputConfiguration.getS3SecretKey();
- String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
- String s3Endpoint = s3OutputConfiguration.getS3Endpoint();
-
- String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
- String s3Path = new S3LogPathResolver().getResolvedPath(
- s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + logType, keySuffix,
- s3OutputConfiguration.getCluster());
- logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
- File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
-
- logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
- writeFileIntoS3File(sourceFile, bucketName, s3Path, s3Endpoint, s3AccessKey, s3SecretKey);
-
- // delete local compressed file
- sourceFile.delete();
- if (deleteOnEnd) {
- logger.info("Deleting input file as required");
- if (!fileToUpload.delete()) {
- logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
- }
- }
- return s3Path;
- }
-
- @VisibleForTesting
- protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path,
- String s3Endpoint, String s3AccessKey, String s3SecretKey) {
- S3Util.writeFileIntoS3File(sourceFile.getAbsolutePath(), bucketName, s3Path, s3Endpoint, s3AccessKey, s3SecretKey);
- }
-
- @VisibleForTesting
- protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() +
- "." + compressionAlgo);
- outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, compressionAlgo);
- return outputFile;
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
new file mode 100644
index 0000000..6cf0c7c
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.appender.rolling.CompositeTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.OnStartupTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.SizeBasedTriggeringPolicy;
+import org.apache.logging.log4j.core.config.AppenderRef;
+import org.apache.logging.log4j.core.config.Configuration;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.layout.PatternLayout;
+
+import java.io.File;
+import java.nio.file.Paths;
+
+/**
+ * Create a custom logger that will be used to ship inputs into specific files
+ * and rollover those files to an archive folder that will be monitored by a
+ * thread which will upload archived files to a cloud storage
+ */
+public class CloudStorageLoggerFactory {
+
+ private static final String ACTIVE_FOLDER = "active";
+ private static final String ARCHIVED_FOLDER = "archived";
+ private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log.gz";
+ private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-hh-mm-ss-SSS}.log";
+
+ public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) {
+ String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, "");
+ String uniqueThreadName = input.getThread().getName();
+ Configuration config = loggerContext.getConfiguration();
+ String destination = logFeederProps.getCloudStorageDestination().getText();
+ String activeLogDir = Paths.get(logFeederProps.getTmpDir(), ACTIVE_FOLDER, type).toFile().getAbsolutePath();
+ String archiveLogDir = Paths.get(logFeederProps.getTmpDir(), destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
+
+ boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
+ String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
+ String fileName = String.join(File.separator, activeLogDir, type + ".log");
+ String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern);
+ PatternLayout layout = PatternLayout.newBuilder()
+ .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
+
+ SizeBasedTriggeringPolicy sizeBasedTriggeringPolicy = SizeBasedTriggeringPolicy.createPolicy(
+ logFeederProps.getRolloverConfig().getRolloverSize());
+ CustomTimeBasedTriggeringPolicy customTimeBasedTriggeringPolicy = CustomTimeBasedTriggeringPolicy
+ .createPolicy(String.valueOf(logFeederProps.getRolloverConfig().getRolloverThresholdTimeMins()));
+
+ final CompositeTriggeringPolicy compositeTriggeringPolicy;
+
+ if (logFeederProps.getRolloverConfig().isRolloverOnStartup()) {
+ OnStartupTriggeringPolicy onStartupTriggeringPolicy = OnStartupTriggeringPolicy.createPolicy(1);
+ compositeTriggeringPolicy = CompositeTriggeringPolicy
+ .createPolicy(sizeBasedTriggeringPolicy, customTimeBasedTriggeringPolicy, onStartupTriggeringPolicy);
+ } else {
+ compositeTriggeringPolicy = CompositeTriggeringPolicy
+ .createPolicy(sizeBasedTriggeringPolicy, customTimeBasedTriggeringPolicy);
+ }
+
+ boolean immediateFlush = logFeederProps.getRolloverConfig().isImmediateFlush();
+ RollingFileAppender appender = RollingFileAppender.newBuilder()
+ .withFileName(fileName)
+ .withFilePattern(filePattern)
+ .withLayout(layout)
+ .withName(type)
+ .withPolicy(compositeTriggeringPolicy)
+ .withImmediateFlush(immediateFlush)
+ .build();
+
+ appender.start();
+ config.addAppender(appender);
+
+ AppenderRef ref = AppenderRef.createAppenderRef(type, null, null);
+ AppenderRef[] refs = new AppenderRef[] {ref};
+
+ LoggerConfig loggerConfig = LoggerConfig
+ .createLogger(false, Level.ALL, input.getThread().getName(),
+ "true", refs, null, config, null);
+ loggerConfig.addAppender(appender, null, null);
+ config.addLogger(uniqueThreadName, loggerConfig);
+ loggerContext.updateLoggers();
+ return loggerContext.getLogger(uniqueThreadName);
+ }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index 561b141..fbbffe6 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -19,12 +19,139 @@
package org.apache.ambari.logfeeder.output.cloud;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.RolloverConfig;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClientFactory;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.Appender;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.appender.RollingFileAppender;
+import org.apache.logging.log4j.core.config.Configuration;
+
+import java.io.File;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
- * Class to handle common operations for cloud storage outputs
- * TODO !!!
+ * Output class for cloud outputs.
+ * Holds loggers - those will ship logs into specific folders, those files can be rolled out to an archive folder,
+ * from there an upload client will be able to ship the log archives to a cloud storage
*/
-public abstract class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
+
+ private static final Logger logger = LogManager.getLogger(CloudStorageOutput.class);
+
+ private final Map<String, Logger> cloudOutputLoggers = new ConcurrentHashMap<>();
+ private final UploadClient uploadClient;
+ private final LogFeederProps logFeederProps;
+ private final LoggerContext loggerContext;
+ private final CloudStorageUploader uploader;
+ private final RolloverConfig rolloverConfig;
+
+ public CloudStorageOutput(LogFeederProps logFeederProps) {
+ this.uploadClient = UploadClientFactory.createUploadClient(logFeederProps);
+ this.logFeederProps = logFeederProps;
+ this.rolloverConfig = logFeederProps.getRolloverConfig();
+ loggerContext = (LoggerContext) LogManager.getContext(false);
+ uploader = new CloudStorageUploader(String.format("%s-uploader", logFeederProps.getCloudStorageDestination().getText()), uploadClient, logFeederProps);
+ uploader.setDaemon(true);
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProperties) throws Exception {
+ logger.info("Initialize cloud output.");
+ uploadClient.init(logFeederProperties);
+ uploader.start();
+ }
+
+ @Override
+ public String getOutputType() {
+ return "cloud";
+ }
+
+ @Override
+ public void copyFile(File inputFile, InputMarker inputMarker) throws Exception {
+ throw new UnsupportedOperationException("Copy file is not supported yet");
+ }
+
+ @Override
+ public void write(String jsonStr, InputMarker inputMarker) throws Exception {
+ String uniqueThreadName = inputMarker.getInput().getThread().getName();
+ Logger cloudLogger = null;
+ if (cloudOutputLoggers.containsKey(uniqueThreadName)) {
+ cloudLogger = cloudOutputLoggers.get(uniqueThreadName);
+ } else {
+ logger.info("New cloud input source found. Register: {}", uniqueThreadName);
+ cloudLogger = CloudStorageLoggerFactory.createLogger(inputMarker.getInput(), loggerContext, logFeederProps);
+ cloudOutputLoggers.put(uniqueThreadName, cloudLogger);
+ }
+ cloudLogger.info(jsonStr);
+ inputMarker.getInput().checkIn(inputMarker);
+ }
+
+ @Override
+ public Long getPendingCount() {
+ return 0L;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return "write:cloud";
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "cloud";
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return null;
+ }
+
+ void removeWorker(Input input) {
+ String uniqueName = input.getThread().getName();
+ if (rolloverConfig.isRolloverOnShutdown() && cloudOutputLoggers.containsKey(uniqueName)) {
+ rollover(cloudOutputLoggers.get(uniqueName));
+ }
+ logger.info("Remove logger: {}", uniqueName);
+ Configuration config = loggerContext.getConfiguration();
+ config.removeLogger(uniqueName);
+ loggerContext.updateLoggers();
+ cloudOutputLoggers.remove(uniqueName);
+ }
+
+ void stopUploader() {
+ uploader.interrupt();
+ if (logFeederProps.isCloudStorageUploadOnShutdown()) {
+ logger.info("Do last upload before shutdown.");
+ uploader.doUpload();
+ }
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ try {
+ if (uploadClient != null) {
+ uploadClient.close();
+ }
+ } catch (Exception e) {
+ logger.error("Error during closing uploader client", e);
+ }
+ }
+
+ private void rollover(Logger logger) {
+ Map<String, Appender> appenders = ((org.apache.logging.log4j.core.Logger) logger).getAppenders();
+ for (Map.Entry<String, Appender> stringAppenderEntry : appenders.entrySet()) {
+ Appender appender = stringAppenderEntry.getValue();
+ if (appender instanceof RollingFileAppender) {
+ ((RollingFileAppender) appender).getManager().rollover();
+ }
+ }
+ }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 4994eb7..16b7e55 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -20,9 +20,11 @@ package org.apache.ambari.logfeeder.output.cloud;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -34,7 +36,6 @@ import java.util.Map;
/**
* Handle output operations for sending cloud inputs to a cloud storage destination
- * TODO !!!
*/
public class CloudStorageOutputManager extends OutputManager {
@@ -49,22 +50,20 @@ public class CloudStorageOutputManager extends OutputManager {
@Override
public void write(Map<String, Object> jsonObj, InputMarker marker) {
- // TODO: make sense to implement this if we will support filters before calling cloud outputs
+ write(LogFeederUtil.getGson().toJson(jsonObj), marker);
}
@Override
public void write(String line, InputMarker marker) {
- logger.info("Output: {}", line);
try {
storageOutput.write(line, marker);
} catch (Exception e) {
-
+ logger.error("Error during cloud output write.", e);
}
}
@Override
public void copyFile(File file, InputMarker marker) {
-
}
@Override
@@ -80,23 +79,29 @@ public class CloudStorageOutputManager extends OutputManager {
@Override
public void init() throws Exception {
logger.info("Called init with cloud storage output manager.");
- storageOutput = CloudStorageFactory.createCloudStorageOutput(logFeederProps);
+ storageOutput = new CloudStorageOutput(logFeederProps);
storageOutput.init(logFeederProps);
add(storageOutput);
}
@Override
public void close() {
-
+ logger.info("Close called for cloud outputs.");
+ storageOutput.stopUploader();
+ storageOutput.setDrain(true);
+ storageOutput.close();
}
@Override
public void logStats() {
-
}
@Override
public void addMetricsContainers(List<MetricData> metricsList) {
+ }
+ @Override
+ public void release(Input input) {
+ storageOutput.removeWorker(input);
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
new file mode 100644
index 0000000..ebb0cef
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.cloud.upload.UploadClient;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.FileUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.File;
+import java.util.Collection;
+
+/**
+ * Periodically checks a folder (contains archived logs) and if it finds any .log or .gz files, it will try to upload them to cloud storage by an upload client (cloud specific)
+ */
+public class CloudStorageUploader extends Thread {
+
+ private static final Logger logger = LogManager.getLogger(CloudStorageUploader.class);
+
+ private final UploadClient uploadClient;
+ private final LogFeederProps logFeederProps;
+ private final String clusterName;
+ private final String hostName;
+ private final String uploaderType;
+
+ public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederProps logFeederProps) {
+ super(name);
+ this.uploadClient = uploadClient;
+ this.logFeederProps = logFeederProps;
+ this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
+ this.clusterName = logFeederProps.getClusterName();
+ this.hostName = LogFeederUtil.hostName;
+ }
+
+ @Override
+ public void run() {
+ logger.info("Start '{}' uploader", uploaderType);
+ boolean stop = false;
+ do {
+ try {
+ try {
+ doUpload();
+ } catch (Exception e) {
+ logger.error("An error occurred during Uploader operation - " + uploaderType, e);
+ }
+ Thread.sleep(1000 * logFeederProps.getCloudStorageUploaderIntervalSeconds());
+ } catch (InterruptedException ie) {
+ logger.info("Uploader ({}) thread interrupted", uploaderType);
+ stop = true;
+ }
+ } while (!stop && !Thread.currentThread().isInterrupted());
+ }
+
+ /**
+ * Finds .log and .gz files and upload them to cloud storage by an uploader client
+ */
+ void doUpload() {
+ try {
+ final String archiveLogDir = String.join(File.separator, logFeederProps.getTmpDir(), uploaderType, "archived");
+ if (new File(archiveLogDir).exists()) {
+ String[] extensions = {"log", "gz"};
+ Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true);
+ if (filesToUpload.isEmpty()) {
+ logger.debug("Not found any files to upload.");
+ } else {
+ for (File file : filesToUpload) {
+ String basePath = uploadClient.getOutputConfig().getOutputBasePath();
+ String outputPath = String.format("%s/%s/%s/%s", clusterName, hostName, file.getParentFile().getName(), file.getName())
+ .replaceAll("//", "/");
+ logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
+ uploadClient.upload(file.getAbsolutePath(), outputPath, basePath);
+ }
+ }
+ } else {
+ logger.debug("Directory {} does not exist.", archiveLogDir);
+ }
+ } catch (Exception e) {
+ logger.error("Exception during cloud upload", e);
+ }
+ }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java
new file mode 100644
index 0000000..41bb5ba
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CustomTimeBasedTriggeringPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud;
+
+import org.apache.logging.log4j.core.Core;
+import org.apache.logging.log4j.core.LogEvent;
+import org.apache.logging.log4j.core.appender.rolling.AbstractTriggeringPolicy;
+import org.apache.logging.log4j.core.appender.rolling.RollingFileManager;
+import org.apache.logging.log4j.core.config.plugins.Plugin;
+import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
+import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Rolls a file over based on time. - it works with a specific interval, it does not use the file date pattern from log4j2 configuration
+ */
+@Plugin(name = "CustomTimeBasedTriggeringPolicy", category = Core.CATEGORY_NAME, printObject = true)
+public final class CustomTimeBasedTriggeringPolicy extends AbstractTriggeringPolicy {
+
+ private final long intervalMin;
+
+ private RollingFileManager manager;
+ private long nextRolloverMillis;
+
+ private CustomTimeBasedTriggeringPolicy(final long intervalMin) {
+ this.intervalMin = intervalMin;
+ }
+
+ public long getIntervalMin() {
+ return this.intervalMin;
+ }
+
+ @Override
+ public void initialize(RollingFileManager manager) {
+ this.manager = manager;
+ long fileTime = this.manager.getFileTime();
+ long actualDate = System.currentTimeMillis();
+ long diff = actualDate - fileTime;
+ long intervalMillis = TimeUnit.MINUTES.toMillis(this.intervalMin);
+ if (diff > intervalMillis) {
+ this.nextRolloverMillis = actualDate;
+ } else {
+ long remainingMillis = intervalMillis - diff;
+ this.nextRolloverMillis = actualDate + remainingMillis;
+ }
+ }
+
+ @Override
+ public boolean isTriggeringEvent(LogEvent event) {
+ if (this.manager.getFileSize() == 0L) {
+ return false;
+ } else {
+ long nowMillis = event.getTimeMillis();
+ if (nowMillis >= this.nextRolloverMillis) {
+ this.nextRolloverMillis = nowMillis + TimeUnit.MINUTES.toMillis(this.intervalMin);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ @PluginFactory
+ public static CustomTimeBasedTriggeringPolicy createPolicy(@PluginAttribute("intervalMins") final String intervalMins) {
+ return new CustomTimeBasedTriggeringPolicy(Long.parseLong(intervalMins));
+ }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
deleted file mode 100644
index 24edb41..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/HDFSOutput.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.output.cloud;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.File;
-
-/**
- * HDFS cloud storage output (on-prem)
- * TODO !!!
- */
-public class HDFSOutput extends CloudStorageOutput {
-
- private Logger logger = LogManager.getLogger(HDFSOutput.class);
-
- @Override
- public String getOutputType() {
- return null;
- }
-
- @Override
- public void copyFile(File inputFile, InputMarker inputMarker) throws Exception {
- }
-
- @Override
- public void write(String line, InputMarker inputMarker) throws Exception {
- inputMarker.getInput().checkIn(inputMarker);
- }
-
- @Override
- public Long getPendingCount() {
- return null;
- }
-
- @Override
- public String getWriteBytesMetricName() {
- return null;
- }
-
- @Override
- public void init(LogFeederProps logFeederProperties) throws Exception {
- logger.info("Initialize on-prem HDFS output");
- }
-
- @Override
- public String getShortDescription() {
- return null;
- }
-
- @Override
- public String getStatMetricName() {
- return null;
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
new file mode 100644
index 0000000..d4a45de
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/AbstractCloudClient.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.BucketConfig;
+import org.apache.ambari.logfeeder.conf.output.CloudStorageOutputConfig;
+import org.apache.ambari.logfeeder.credential.CompositeSecretStore;
+import org.apache.ambari.logfeeder.credential.EnvSecretStore;
+import org.apache.ambari.logfeeder.credential.FileSecretStore;
+import org.apache.ambari.logfeeder.credential.HadoopCredentialSecretStore;
+import org.apache.ambari.logfeeder.credential.PlainTextSecretStore;
+import org.apache.ambari.logfeeder.credential.PropertySecretStore;
+import org.apache.ambari.logfeeder.credential.SecretStore;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Holds common cloud based client operations
+ */
+abstract class AbstractCloudClient {
+
+ private static final Logger logger = LogManager.getLogger(AbstractCloudClient.class);
+
+ /**
+ * Create a cloud specific bucket if it does not exists
+ * @param bucket name of the bucket
+ */
+ abstract void createBucketIfNeeded(String bucket);
+
+ /**
+ * Try to bootstrap the default bucket, until it is not successful.
+ * @param bucket name of the bucket
+ * @param bucketConfig bucket config holder
+ */
+ void bootstrapBucket(String bucket, BucketConfig bucketConfig) {
+ if (bucketConfig.isCreateBucketOnStartup()) {
+ boolean ready = false;
+ while (!ready) {
+ try {
+ createBucketIfNeeded(bucket);
+ ready = true;
+ } catch (Exception e) {
+ logger.error("Error during bucket creation - bucket : " + bucket, e);
+ }
+ if (!ready) {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ logger.error("Error during thread sleep (bucket bootstrap)", e);
+ Thread.currentThread().interrupt();
+ ready = true;
+ }
+ } else {
+ logger.warn("Bucket ('{}') creation failed. Retry ...", bucket);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get secret key pair (access / secret) key
+ *
+ * @param props global configuration holder
+ * @param config cloud based configuration
+ * @return secret key pair
+ */
+ SecretKeyPair getSecretKeyPair(LogFeederProps props, CloudStorageOutputConfig config) {
+ String secretFile = config.isUseFileSecrets() ? config.getSecretKeyFileLocation() : null;
+ String secretRef = config.isUseHadoopCredentialStorage() ? config.getSecretKeyHadoopCredentialReference() : null;
+ CompositeSecretStore secretKeyStore = createCompositeSecretStore(props, config.getSecretKey(), config.getSecretKeyProperty(),
+ config.getSecretKeyEnvVariable(), secretFile, secretRef);
+
+ String accessFile = config.isUseFileSecrets() ? config.getAccessKeyFileLocation() : null;
+ String accessRef = config.isUseHadoopCredentialStorage() ? config.getAccessKeyHadoopCredentialReference() : null;
+ CompositeSecretStore accessKeyStore = createCompositeSecretStore(props, config.getAccessKey(), config.getAccessKeyProperty(),
+ config.getAccessKeyEnvVariable(), accessFile, accessRef);
+ if (secretKeyStore.getSecret() == null) {
+ throw new RuntimeException(String.format("No any %s credentials found for secret access key.", config.getDescription()));
+ }
+ if (accessKeyStore.getSecret() == null) {
+ throw new RuntimeException(String.format("No any %s credentials found for access key id.", config.getDescription()));
+ }
+ return new SecretKeyPair(accessKeyStore.getSecret(), secretKeyStore.getSecret());
+ }
+
+ /**
+ * Common operation to create secret stores for cloud secrets
+ * @param props global property holder
+ * @param property java property to check for secret
+ * @param env env variable to check for secret
+ * @param file file to check for secret
+ * @param credentialRef credential provider referece to check for secret
+ * @return composite secret store that contains multiple way to get a secret
+ */
+ CompositeSecretStore createCompositeSecretStore(LogFeederProps props, String plainTextSecret, String property, String env,
+ String file, String credentialRef) {
+ List<SecretStore> secretStores = new ArrayList<>();
+ if (StringUtils.isNotBlank(plainTextSecret)) {
+ secretStores.add(new PlainTextSecretStore(plainTextSecret));
+ }
+ if (StringUtils.isNotBlank(credentialRef)) {
+ secretStores.add(new HadoopCredentialSecretStore(credentialRef, props.getLogFeederSecurityConfig().getCredentialStoreProviderPath()));
+ }
+ if (StringUtils.isNotBlank(file)) {
+ secretStores.add(new FileSecretStore(file));
+ }
+ if (StringUtils.isNotBlank(env)) {
+ secretStores.add(new EnvSecretStore(env));
+ }
+ secretStores.add(new PropertySecretStore(property));
+ return new CompositeSecretStore(secretStores.toArray(new SecretStore[0]));
+ }
+
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
new file mode 100644
index 0000000..dd5a225
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSS3UploadClient.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
+import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+
+public class HDFSS3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+
+ private static final Logger logger = LogManager.getLogger(HDFSS3UploadClient.class);
+
+ private final S3OutputConfig s3OutputConfig;
+
+ private FileSystem fs;
+
+ public HDFSS3UploadClient(S3OutputConfig s3OutputConfig) {
+ this.s3OutputConfig = s3OutputConfig;
+ }
+
+ @Override
+ void createBucketIfNeeded(String bucket) {
+ logger.warn("HDFS based S3 client won't bootstrap default bucket ('{}')", s3OutputConfig.getBucket());
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProps) {
+ SecretKeyPair keyPair = getSecretKeyPair(logFeederProps, s3OutputConfig);
+ // TODO: load configuration from file
+ Configuration conf = LogFeederHDFSUtil.buildHdfsConfiguration(s3OutputConfig.getBucket(), "s3a");
+ conf.set("fs.s3a.access.key", new String(keyPair.getAccessKey()));
+ conf.set("fs.s3a.secret.key", new String(keyPair.getSecretKey()));
+ conf.set("fs.s3a.aws.credentials.provider", SimpleAWSCredentialsProvider.NAME);
+ conf.set("fs.s3a.endpoint", s3OutputConfig.getEndpoint());
+ conf.set("fs.s3a.path.style.access", String.valueOf(s3OutputConfig.isPathStyleAccess()));
+ conf.set("fs.s3a.multiobjectdelete.enable", String.valueOf(s3OutputConfig.isMultiobjectDeleteEnable()));
+ this.fs = LogFeederHDFSUtil.buildFileSystem(conf);
+ }
+
+ @Override
+ public void upload(String source, String target, String basePath) throws Exception {
+ LogFeederHDFSUtil.copyFromLocal(source, target, this.fs, true, true, null);
+ }
+
+ @Override
+ public S3OutputConfig getOutputConfig() {
+ return this.s3OutputConfig;
+ }
+
+ @Override
+ public void close() throws IOException {
+ IOUtils.closeQuietly(fs);
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
new file mode 100644
index 0000000..0c4dce5
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.HdfsOutputConfig;
+import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * HDFS (on-prem) specific uploader client.
+ */
+public class HDFSUploadClient implements UploadClient<HdfsOutputConfig> {
+
+ private static final Logger logger = LogManager.getLogger(HDFSUploadClient.class);
+
+ private final HdfsOutputConfig hdfsOutputConfig;
+ private final FsPermission fsPermission;
+ private FileSystem fs;
+
+ public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig) {
+ this.hdfsOutputConfig = hdfsOutputConfig;
+ this.fsPermission = new FsPermission(hdfsOutputConfig.getHdfsFilePermissions());
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProps) {
+ if (StringUtils.isNotBlank(hdfsOutputConfig.getHdfsUser())) {
+ System.setProperty("HADOOP_USER_NAME", hdfsOutputConfig.getHdfsUser());
+ }
+ this.fs = LogFeederHDFSUtil.buildFileSystem(
+ hdfsOutputConfig.getHdfsHost(),
+ String.valueOf(hdfsOutputConfig.getHdfsPort()));
+ if (logFeederProps.getHdfsOutputConfig().isSecure()) {
+ Configuration conf = fs.getConf();
+ conf.set("hadoop.security.authentication", "kerberos");
+ }
+ }
+
+ @Override
+ public void upload(String source, String target, String basePath) throws Exception {
+ String outputPath = String.format("%s/%s", basePath, target).replaceAll("//", "/");
+ LogFeederHDFSUtil.copyFromLocal(source, outputPath, fs, true, true, fsPermission);
+ }
+
+ @Override
+ public HdfsOutputConfig getOutputConfig() {
+ return this.hdfsOutputConfig;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly(fs);
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
new file mode 100644
index 0000000..819a001
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/S3UploadClient.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.S3OutputConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.tools.ant.util.FileUtils;
+
+import java.io.File;
+
+/**
+ * S3 specific upload client
+ */
+public class S3UploadClient extends AbstractCloudClient implements UploadClient<S3OutputConfig> {
+
+ private static final Logger logger = LogManager.getLogger(S3UploadClient.class);
+
+ private final S3OutputConfig s3OutputConfig;
+ private final CannedAccessControlList acl;
+ private AmazonS3 s3Client;
+
+ public S3UploadClient(S3OutputConfig s3OutputConfig) {
+ this.s3OutputConfig = s3OutputConfig;
+ this.acl = s3OutputConfig.calculateAcls(s3OutputConfig.getObjectAcl());
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProps) {
+ SecretKeyPair keyPair = getSecretKeyPair(logFeederProps, s3OutputConfig);
+ AWSCredentials awsCredentials = new BasicAWSCredentials(new String(keyPair.getAccessKey()), new String(keyPair.getSecretKey()));
+ AWSCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider(awsCredentials);
+ AwsClientBuilder.EndpointConfiguration endpointConf = new AwsClientBuilder.EndpointConfiguration(s3OutputConfig.getEndpoint(), s3OutputConfig.getRegion());
+ s3Client = AmazonS3ClientBuilder.standard()
+ .withCredentials(credentialsProvider)
+ .withEndpointConfiguration(endpointConf)
+ .withPathStyleAccessEnabled(s3OutputConfig.isPathStyleAccess())
+ .build();
+ bootstrapBucket(s3OutputConfig.getOutputBasePath(), s3OutputConfig.getBucketConfig());
+ }
+
+ @Override
+ public void upload(String source, String target, String bucket) throws Exception {
+ File fileToUpload = new File(source);
+ logger.info("Starting S3 upload {} -> bucket: {}, key: {}", source, bucket, target);
+ s3Client.putObject(bucket, target, new File(source));
+ s3Client.setObjectAcl(bucket, target, acl);
+ FileUtils.delete(fileToUpload);
+ }
+
+ @Override
+ public S3OutputConfig getOutputConfig() {
+ return this.s3OutputConfig;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ void createBucketIfNeeded(String bucket) {
+ if (!s3Client.doesBucketExistV2(bucket)) {
+ s3Client.createBucket(bucket);
+ }
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
similarity index 67%
rename from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
rename to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
index 871ae93..5321b81 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/SecretKeyPair.java
@@ -16,17 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.output.cloud;
+package org.apache.ambari.logfeeder.output.cloud.upload;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
+final class SecretKeyPair {
+ private final char[] accessKey;
+ private final char[] secretKey;
-/**
- * Class for creating the right cloud storage outputs based on global Log Feeder configurations
- * TODO !!!
- */
-public class CloudStorageFactory {
+ SecretKeyPair(char[] accessKey, char[] secretKey) {
+ this.accessKey = accessKey;
+ this.secretKey = secretKey;
+ }
+
+ final char[] getAccessKey() {
+ return accessKey;
+ }
- public static CloudStorageOutput createCloudStorageOutput(LogFeederProps properties) {
- return new HDFSOutput();
+ final char[] getSecretKey() {
+ return secretKey;
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
new file mode 100644
index 0000000..d8adf68
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClient.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.conf.output.OutputConfig;
+
+import java.io.Closeable;
+
+/**
+ * Client that is responsible to upload files to cloud storage implementation.
+ * @param <CONF_TYPE> specific cloud configuration type
+ */
+public interface UploadClient<CONF_TYPE extends OutputConfig> extends Closeable {
+
+ /**
+ * Initialize the client
+ * @param logFeederProps global properties holder
+ */
+ void init(LogFeederProps logFeederProps);
+
+ /**
+ * Upload source file to cloud storage location
+ * @param source file that will be uploaded
+ * @param target file key/output on cloud storage
+ * @param basePath act as a base directory or can be a bucket as well
+ * @throws Exception error during upload
+ */
+ void upload(String source, String target, String basePath) throws Exception;
+
+ /**
+ * Obtain cloud specific output configuration
+ * @return output configuration holder
+ */
+ CONF_TYPE getOutputConfig();
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
new file mode 100644
index 0000000..b86ec6d
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/UploadClientFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output.cloud.upload;
+
+import org.apache.ambari.logfeeder.conf.CloudStorageDestination;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * Factory class to create cloud specific data uploader client based on global Log Feeder settings.
+ */
+public class UploadClientFactory {
+
+ private static final Logger logger = LogManager.getLogger(UploadClientFactory.class);
+
+ /**
+ * Create a new cloud specific client that can upload data to cloud storage
+ * @param logFeederProps global settings
+ * @return created cloud specific uploader client
+ */
+ public static UploadClient createUploadClient(LogFeederProps logFeederProps) {
+ CloudStorageDestination destType = logFeederProps.getCloudStorageDestination();
+ logger.info("Creating upload client for storage: {}", destType);
+ if (CloudStorageDestination.HDFS.equals(destType)) {
+ return new HDFSUploadClient(logFeederProps.getHdfsOutputConfig());
+ } else if (CloudStorageDestination.S3.equals(destType)) {
+ if (logFeederProps.isUseCloudHdfsClient()) {
+ return new HDFSS3UploadClient(logFeederProps.getS3OutputConfig());
+ } else {
+ return new S3UploadClient(logFeederProps.getS3OutputConfig());
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("No cloud storage type is selected as destination: %s", destType));
+ }
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
deleted file mode 100644
index 82a3f1b..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.util.DateUtil;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Date;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * A class that manages local storage of log events before they are uploaded to the output destinations.
- *
- * This class should be used by any {@link org.apache.ambari.logfeeder.plugin.output.Output}s that wish to upload log files to an
- * output destination on a periodic batched basis. Log events should be added to an instance
- * of this class to be stored locally. This class determines when to
- * rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
- * {@link RolloverHandler} to trigger the handling of the rolled over file.
- */
-public class LogSpooler {
-
- private static final Logger logger = LogManager.getLogger(LogSpooler.class);
-
- private static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
- private static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
-
- private String spoolDirectory;
- private String sourceFileNamePrefix;
- private RolloverCondition rolloverCondition;
- private RolloverHandler rolloverHandler;
- private PrintWriter currentSpoolBufferedWriter;
- private File currentSpoolFile;
- private LogSpoolerContext currentSpoolerContext;
- private Timer rolloverTimer;
- private AtomicBoolean rolloverInProgress = new AtomicBoolean(false);
-
- /**
- * Create an instance of the LogSpooler.
- * @param spoolDirectory The directory under which spooler files are created.
- * Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
- * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
- * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
- * determine when to rollover.
- * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
- * there should be a rollover.
- */
- public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
- RolloverHandler rolloverHandler) {
- this(spoolDirectory, sourceFileNamePrefix, rolloverCondition, rolloverHandler,
- TIME_BASED_ROLLOVER_DISABLED_THRESHOLD);
- }
-
- /**
- * Create an instance of the LogSpooler.
- * @param spoolDirectory The directory under which spooler files are created.
- * Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
- * @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
- * @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
- * determine when to rollover.
- * @param rolloverHandler An object of type {@link RolloverHandler} that will be called when
- * there should be a rollover.
- * @param rolloverTimeThresholdSecs Setting a non-zero value enables time based rollover of
- * spool files. Sending a 0 value disables this functionality.
- */
- public LogSpooler(String spoolDirectory, String sourceFileNamePrefix, RolloverCondition rolloverCondition,
- RolloverHandler rolloverHandler, long rolloverTimeThresholdSecs) {
- this.spoolDirectory = spoolDirectory;
- this.sourceFileNamePrefix = sourceFileNamePrefix;
- this.rolloverCondition = rolloverCondition;
- this.rolloverHandler = rolloverHandler;
- if (rolloverTimeThresholdSecs != TIME_BASED_ROLLOVER_DISABLED_THRESHOLD) {
- rolloverTimer = new Timer("log-spooler-timer-" + sourceFileNamePrefix, true);
- rolloverTimer.scheduleAtFixedRate(new LogSpoolerRolloverTimerTask(),
- rolloverTimeThresholdSecs*1000, rolloverTimeThresholdSecs*1000);
- }
- initializeSpoolState();
- }
-
- private void initializeSpoolDirectory() {
- File spoolDir = new File(spoolDirectory);
- if (!spoolDir.exists()) {
- logger.info("Creating spool directory: " + spoolDir);
- boolean result = spoolDir.mkdirs();
- if (!result) {
- throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
- }
- }
- }
-
- private void initializeSpoolState() {
- initializeSpoolDirectory();
- currentSpoolFile = initializeSpoolFile();
- try {
- currentSpoolBufferedWriter = initializeSpoolWriter(currentSpoolFile);
- } catch (IOException e) {
- throw new LogSpoolerException("Could not create buffered writer for spool file: " + currentSpoolFile
- + ", error message: " + e.getLocalizedMessage(), e);
- }
- currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
- logger.info("Initialized spool file at path: " + currentSpoolFile);
- }
-
- @VisibleForTesting
- protected File initializeSpoolFile() {
- return new File(spoolDirectory, getCurrentFileName());
- }
-
- @VisibleForTesting
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return new PrintWriter(new BufferedWriter(new FileWriter(spoolFile)));
- }
-
- /**
- * Add an event for spooling.
- *
- * This method adds the event to the current spool file's buffer. On completion, it
- * calls the {@link RolloverCondition#shouldRollover(LogSpoolerContext)} method to determine if
- * it is ready to rollover the file.
- * @param logEvent The log event to spool.
- */
- public synchronized void add(String logEvent) {
- currentSpoolBufferedWriter.println(logEvent);
- currentSpoolerContext.logEventSpooled();
- if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
- logger.info("Trying to rollover based on rollover condition");
- tryRollover();
- }
- }
-
- /**
- * Trigger a rollover of the current spool file.
- *
- * This method manages the rollover of the spool file, and then invokes the
- * {@link RolloverHandler#handleRollover(File)} to handle what should be done with the
- * rolled over file.
- */
- public void rollover() {
- logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
- currentSpoolBufferedWriter.flush();
- if (currentSpoolFile.length()==0) {
- logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
- } else {
- currentSpoolBufferedWriter.close();
- rolloverHandler.handleRollover(currentSpoolFile);
- logger.info("Invoked rollover handler with file: " + currentSpoolFile);
- initializeSpoolState();
- }
- boolean status = rolloverInProgress.compareAndSet(true, false);
- if (!status) {
- logger.error("Should have reset rollover flag!!");
- }
- }
-
- private synchronized void tryRollover() {
- if (rolloverInProgress.compareAndSet(false, true)) {
- rollover();
- } else {
- logger.warn("Ignoring rollover call as rollover already in progress for file " +
- currentSpoolFile);
- }
- }
-
- private String getCurrentFileName() {
- Date currentDate = new Date();
- String dateStr = DateUtil.dateToString(currentDate, fileDateFormat);
- return sourceFileNamePrefix + dateStr;
- }
-
- /**
- * Cancel's any time based rollover task, if started.
- */
- public void close() {
- if (rolloverTimer != null) {
- rolloverTimer.cancel();
- }
- }
-
- private class LogSpoolerRolloverTimerTask extends TimerTask {
- @Override
- public void run() {
- logger.info("Trying rollover based on time");
- tryRollover();
- }
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
deleted file mode 100644
index 616300f..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import java.io.File;
-import java.util.Date;
-
-/**
- * A class that holds the state of an spool file.
- *
- * The state in this class can be used by a {@link RolloverCondition} to determine
- * if an active spool file should be rolled over.
- */
-public class LogSpoolerContext {
-
- private File activeSpoolFile;
- private long numEventsSpooled;
- private Date activeLogCreationTime;
-
- /**
- * Create a new LogSpoolerContext
- * @param activeSpoolFile the spool file for which to hold state
- */
- public LogSpoolerContext(File activeSpoolFile) {
- this.activeSpoolFile = activeSpoolFile;
- this.numEventsSpooled = 0;
- this.activeLogCreationTime = new Date();
- }
-
- /**
- * Increment number of spooled events by one.
- */
- public void logEventSpooled() {
- numEventsSpooled++;
- }
-
- public File getActiveSpoolFile() {
- return activeSpoolFile;
- }
-
- public long getNumEventsSpooled() {
- return numEventsSpooled;
- }
-
- public Date getActiveLogCreationTime() {
- return activeLogCreationTime;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- LogSpoolerContext that = (LogSpoolerContext) o;
-
- if (numEventsSpooled != that.numEventsSpooled) return false;
- if (!activeSpoolFile.equals(that.activeSpoolFile)) return false;
- return activeLogCreationTime.equals(that.activeLogCreationTime);
-
- }
-
- @Override
- public int hashCode() {
- int result = activeSpoolFile.hashCode();
- result = 31 * result + (int) (numEventsSpooled ^ (numEventsSpooled >>> 32));
- result = 31 * result + activeLogCreationTime.hashCode();
- return result;
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
deleted file mode 100644
index 14bb139..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-public class LogSpoolerException extends RuntimeException {
- public LogSpoolerException(String message, Exception cause) {
- super(message, cause);
- }
-
- public LogSpoolerException(String message) {
- super(message);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
deleted file mode 100644
index 48ace11..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-/**
- * An interface that is used to determine whether a rollover of a locally spooled log file should be triggered.
- */
-public interface RolloverCondition {
-
- /**
- * Check if the active spool file should be rolled over.
- *
- * If this returns true, the {@link LogSpooler} will initiate activities related
- * to rollover of the file
- * @param currentSpoolerContext {@link LogSpoolerContext} that holds state about the file being checked
- * for rollover.
- * @return true if active spool file should be rolled over, false otherwise
- */
- boolean shouldRollover(LogSpoolerContext currentSpoolerContext);
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
deleted file mode 100644
index 2ec2708..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import java.io.File;
-
-/**
- * An interface that is used to trigger the handling of a rolled over file.
- *
- * Implementations of this interface will typically upload the rolled over file to
- * a target destination, like HDFS.
- */
-public interface RolloverHandler {
- /**
- * Handle a rolled over file.
- *
- * This method is called inline from the {@link LogSpooler#rollover()} method.
- * Hence implementations should either complete the handling fast, or do so
- * asynchronously. The cleanup of the file is left to implementors, but should
- * typically be done once the upload the file to the target destination is complete.
- * @param rolloverFile The file that has been rolled over.
- */
- void handleRollover(File rolloverFile);
-}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
index a225a12..61be819 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederHDFSUtil.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -33,36 +34,43 @@ public class LogFeederHDFSUtil {
throw new UnsupportedOperationException();
}
- public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
- boolean delSrc) {
+ public static void copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
+ boolean delSrc, FsPermission fsPermission) throws Exception {
Path src = new Path(sourceFilepath);
Path dst = new Path(destFilePath);
- boolean isCopied = false;
- try {
- logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
- fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
- isCopied = true;
- } catch (Exception e) {
- logger.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e);
+ logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
+ fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
+ if (fsPermission != null) {
+ fileSystem.setPermission(dst, fsPermission);
}
- return isCopied;
}
public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+ return buildFileSystem(hdfsHost, hdfsPort, "hdfs");
+ }
+
+ public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort, String scheme) {
+ Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort, scheme);
+ return buildFileSystem(configuration);
+ }
+
+ public static FileSystem buildFileSystem(Configuration configuration) {
try {
- Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
- FileSystem fs = FileSystem.get(configuration);
- return fs;
+ return FileSystem.get(configuration);
} catch (Exception e) {
- logger.error("Exception is buildFileSystem :", e);
+ logger.error("Exception during buildFileSystem call:", e);
}
return null;
}
- private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
- String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+ public static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort, String scheme) {
+ return buildHdfsConfiguration(String.format("%s:%s", hdfsHost, hdfsPort), scheme);
+ }
+
+ public static Configuration buildHdfsConfiguration(String address, String scheme) {
+ String url = String.format("%s://%s/", scheme, address);
Configuration configuration = new Configuration();
- configuration.set("fs.default.name", url);
+ configuration.set("fs.defaultFS", url);
return configuration;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
deleted file mode 100644
index 1135eea..0000000
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.util;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
-
-import io.minio.MinioClient;
-import io.minio.errors.InvalidEndpointException;
-import io.minio.errors.InvalidPortException;
-import org.apache.ambari.logfeeder.common.LogFeederConstants;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * Utility to connect to s3
- */
-public class S3Util {
- private static final Logger logger = LogManager.getLogger(S3Util.class);
-
- private S3Util() {
- throw new UnsupportedOperationException();
- }
-
- public static MinioClient getS3Client(String endpoint, String accessKey, String secretKey) throws InvalidPortException, InvalidEndpointException {
- return new MinioClient(endpoint, accessKey, secretKey);
- }
-
- public static String getBucketName(String s3Path) {
- String bucketName = null;
- // s3path
- if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
- bucketName = s3PathParts[0];
- }
- return bucketName;
- }
-
- public static String getS3Key(String s3Path) {
- StringBuilder s3Key = new StringBuilder();
- if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
- ArrayList<String> s3PathList = new ArrayList<>(Arrays.asList(s3PathParts));
- s3PathList.remove(0);// remove bucketName
- for (int index = 0; index < s3PathList.size(); index++) {
- if (index > 0) {
- s3Key.append(LogFeederConstants.S3_PATH_SEPARATOR);
- }
- s3Key.append(s3PathList.get(index));
- }
- }
- return s3Key.toString();
- }
-
- /**
- * Get the buffer reader to read s3 file as a stream
- * @param s3Path s3 specific path
- * @param s3Endpoint url of an s3 server
- * @param accessKey s3 access key - pass by an input shipper configuration
- * @param secretKey s3 secret key - pass by an input shipper configuration
- * @return buffered reader object which can be used to read s3 file object
- * @throws Exception error that happens during reading s3 file
- */
- public static BufferedReader getReader(String s3Path, String s3Endpoint, String accessKey, String secretKey) throws Exception {
- // TODO error handling
- // Compression support
- // read header and decide the compression(auto detection)
- // For now hard-code GZIP compression
- String s3Bucket = getBucketName(s3Path);
- String s3Key = getS3Key(s3Path);
- GZIPInputStream objectInputStream = null;
- InputStreamReader inputStreamReader = null;
- BufferedReader bufferedReader = null;
- try {
- MinioClient s3Client = getS3Client(s3Endpoint, accessKey, secretKey);
- s3Client.statObject(s3Bucket, s3Key);
- objectInputStream = new GZIPInputStream(s3Client.getObject(s3Bucket, s3Key));
- inputStreamReader = new InputStreamReader(objectInputStream);
- bufferedReader = new BufferedReader(inputStreamReader);
- return bufferedReader;
- } catch (Exception e) {
- logger.error("Error in creating stream reader for s3 file :" + s3Path, e.getCause());
- throw e;
- } finally {
- try {
- if (inputStreamReader != null) {
- inputStreamReader.close();
- }
- if (bufferedReader != null) {
- bufferedReader.close();
- }
- if (objectInputStream != null) {
- objectInputStream.close();
- }
- } catch (Exception e) {
- // do nothing
- }
- }
- }
-
- public static void writeFileIntoS3File(String filename, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) {
- try {
- MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
- s3Client.putObject(bucketName, s3Key, filename);
- } catch (Exception e) {
- logger.error("Could not write file to s3", e);
- }
- }
-
- public static void writeDataIntoS3File(String data, String bucketName, String s3Key, String endpoint, String accessKey, String secretKey) {
- try (ByteArrayInputStream bai = new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))) {
- MinioClient s3Client = getS3Client(endpoint, accessKey, secretKey);
- s3Client.putObject(bucketName, s3Key, bai, bai.available(), "application/octet-stream");
- } catch (Exception e) {
- logger.error("Could not write data to s3", e);
- }
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/main/resources/alias_config.json b/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
index 229a9b6..163aad3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
+++ b/ambari-logsearch-logfeeder/src/main/resources/alias_config.json
@@ -3,9 +3,6 @@
"file": {
"klass": "org.apache.ambari.logfeeder.input.InputFile"
},
- "s3_file": {
- "klass": "org.apache.ambari.logfeeder.input.InputS3File"
- },
"simulate": {
"klass": "org.apache.ambari.logfeeder.input.InputSimulate"
},
@@ -53,12 +50,6 @@
},
"dev_null": {
"klass": "org.apache.ambari.logfeeder.output.OutputDevNull"
- },
- "s3_file": {
- "klass": "org.apache.ambari.logfeeder.output.OutputS3File"
- },
- "hdfs": {
- "klass": "org.apache.ambari.logfeeder.output.OutputHDFSFile"
}
}
}
\ No newline at end of file
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index 06c95f3..127e1b3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -39,5 +39,26 @@ logfeeder.tmp.dir=${LOGFEEDER_RELATIVE_LOCATION:}target/tmp
#logfeeder.configs.local.enabled=true
#logfeeder.configs.filter.solr.enabled=true
#logfeeder.docker.registry.enabled=true
-logfeeder.cloud.storage.mode=default
+
#logfeeder.cloud.storage.mode=cloud
+logfeeder.cloud.storage.mode=default
+logfeeder.cloud.storage.destination=s3
+logfeeder.cloud.storage.uploader.interval.seconds=1
+logfeeder.cloud.storage.upload.on.shutdown=true
+logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.rollover.threshold.min=1000
+logfeeder.cloud.rollover.threshold.size=1K
+logfeeder.cloud.rollover.immediate.flush=true
+
+logfeeder.hdfs.host=c7401.ambari.apache.org
+logfeeder.hdfs.port=8020
+logfeeder.hdfs.user=hdfs
+logfeeder.hdfs.output.base.dir=/user/hdfs/logfeeder
+
+logfeeder.s3.endpoint=http://localhost:4569
+logfeeder.s3.secret.key=MySecretKey
+logfeeder.s3.access.key=MyAccessKey
+logfeeder.s3.bucket=logfeeder
+logfeeder.s3.object.acl=public-read
+logfeeder.s3.path.style.access=true
+logfeeder.s3.multiobject.delete.enable=false
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
deleted file mode 100644
index 6674be1..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class OutputS3FileTest {
-
- private Map<String, Object> configMap;
-
- @Before
- public void setupConfiguration() {
- configMap = new HashMap<>();
- String[] configKeys = new String[] {
- S3OutputConfiguration.SPOOL_DIR_KEY,
- S3OutputConfiguration.S3_BUCKET_NAME_KEY,
- S3OutputConfiguration.S3_LOG_DIR_KEY,
- S3OutputConfiguration.S3_ACCESS_KEY,
- S3OutputConfiguration.S3_SECRET_KEY,
- S3OutputConfiguration.COMPRESSION_ALGO_KEY,
- S3OutputConfiguration.ADDITIONAL_FIELDS_KEY
- };
- Map<String, String> additionalKeys = new HashMap<>();
- additionalKeys.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
- Object[] configValues = new Object[] {
- "/var/ambari-logsearch/logfeeder",
- "s3_bucket_name",
- "logs",
- "ABCDEFGHIJ1234",
- "amdfbldkfdlf",
- "gz",
- additionalKeys
- };
- for (int i = 0; i < configKeys.length; i++) {
- configMap.put(configKeys[i], configValues[i]);
- }
- }
-
- @Test
- public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
-
- String thresholdSize = Long.toString(15 * 1024 * 1024L);
- LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
- File activeSpoolFile = mock(File.class);
- expect(activeSpoolFile.length()).andReturn(20*1024*1024L);
- expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
- replay(logSpoolerContext, activeSpoolFile);
-
- OutputS3File outputS3File = new OutputS3File();
- configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
- outputS3File.loadConfig(configMap);
- outputS3File.init(new LogFeederProps());
-
- assertTrue(outputS3File.shouldRollover(logSpoolerContext));
- }
-
- @Test
- public void shouldNotRolloverBeforeSufficientSizeIsReached() throws Exception {
- String thresholdSize = Long.toString(15 * 1024 * 1024L);
- LogSpoolerContext logSpoolerContext = mock(LogSpoolerContext.class);
- File activeSpoolFile = mock(File.class);
- expect(activeSpoolFile.length()).andReturn(10*1024*1024L);
- expect(logSpoolerContext.getActiveSpoolFile()).andReturn(activeSpoolFile);
- replay(logSpoolerContext, activeSpoolFile);
-
- OutputS3File outputS3File = new OutputS3File();
- configMap.put(S3OutputConfiguration.ROLLOVER_SIZE_THRESHOLD_BYTES_KEY, thresholdSize);
- outputS3File.loadConfig(configMap);
- outputS3File.init(new LogFeederProps());
-
- assertFalse(outputS3File.shouldRollover(logSpoolerContext));
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
deleted file mode 100644
index d1376c4..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-
-public class S3LogPathResolverTest {
-
- @Test
- public void shouldResolveHostName() {
- String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$host", "filename.log", "cl1");
- assertEquals("my_s3_path/" + LogFeederUtil.hostName + "/filename.log", resolvedPath);
- }
-
- @Test
- public void shouldResolveIpAddress() {
- String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$ip", "filename.log", "cl1");
- assertEquals("my_s3_path/" + LogFeederUtil.ipAddress + "/filename.log", resolvedPath);
- }
-
- @Test
- public void shouldResolveCluster() {
- String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster", "filename.log", "cl1");
- assertEquals("my_s3_path/cl1/filename.log", resolvedPath);
- }
-
- @Test
- public void shouldResolveCombinations() {
- String resolvedPath = new S3LogPathResolver().getResolvedPath("my_s3_path/$cluster/$host", "filename.log", "cl1");
- assertEquals("my_s3_path/cl1/"+ LogFeederUtil.hostName + "/filename.log", resolvedPath);
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
deleted file mode 100644
index e070545..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.easymock.EasyMock.*;
-import static org.junit.Assert.assertEquals;
-
-public class S3UploaderTest {
-
- public static final String TEST_BUCKET = "test_bucket";
- public static final String TEST_PATH = "test_path";
- public static final String GZ = "gz";
- public static final String LOG_TYPE = "hdfs_namenode";
- public static final String ACCESS_KEY_VALUE = "accessKeyValue";
- public static final String SECRET_KEY_VALUE = "secretKeyValue";
-
- @Test
- public void shouldUploadToS3ToRightBucket() {
- File fileToUpload = mock(File.class);
- String fileName = "hdfs_namenode.log.123343493473948";
- expect(fileToUpload.getName()).andReturn(fileName);
- final File compressedFile = mock(File.class);
- Map<String, Object> configs = setupS3Configs();
-
- S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
- expect(compressedFile.getAbsolutePath()).andReturn(TEST_BUCKET + "/" + LOG_TYPE + "/" +fileName);
- expect(compressedFile.delete()).andReturn(true);
- expect(fileToUpload.delete()).andReturn(true);
- replay(fileToUpload, compressedFile);
-
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
- @Override
- protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- return compressedFile;
- }
- @Override
- protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
- }
- };
- String resolvedPath = s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
- assertEquals("test_path/hdfs_namenode/hdfs_namenode.log.123343493473948.gz", resolvedPath);
- }
-
- @Test
- public void shouldCleanupLocalFilesOnSuccessfulUpload() {
- File fileToUpload = mock(File.class);
- String fileName = "hdfs_namenode.log.123343493473948";
- expect(fileToUpload.getName()).andReturn(fileName);
- final File compressedFile = mock(File.class);
- Map<String, Object> configs = setupS3Configs();
-
- S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
- expect(compressedFile.delete()).andReturn(true);
- expect(fileToUpload.delete()).andReturn(true);
- replay(fileToUpload, compressedFile);
-
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
- @Override
- protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- return compressedFile;
- }
-
- @Override
- protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
- }
- };
- s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
- verify(fileToUpload);
- verify(compressedFile);
- }
-
- @Test
- public void shouldNotCleanupUncompressedFileIfNotRequired() {
- File fileToUpload = mock(File.class);
- String fileName = "hdfs_namenode.log.123343493473948";
- expect(fileToUpload.getName()).andReturn(fileName);
- final File compressedFile = mock(File.class);
- Map<String, Object> configs = setupS3Configs();
-
- S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
- expect(compressedFile.delete()).andReturn(true);
- replay(fileToUpload, compressedFile);
-
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, LOG_TYPE) {
- @Override
- protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- return compressedFile;
- }
- @Override
- protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
- }
- };
- s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
-
- verify(fileToUpload);
- verify(compressedFile);
- }
-
- @Test
- public void shouldExpandVariablesInPath() {
- File fileToUpload = mock(File.class);
- String fileName = "hdfs_namenode.log.123343493473948";
- expect(fileToUpload.getName()).andReturn(fileName);
- final File compressedFile = mock(File.class);
- Map<String, Object> configs = setupS3Configs();
- configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, "$cluster/"+TEST_PATH);
-
-
- S3OutputConfiguration s3OutputConfiguration = new S3OutputConfiguration(configs);
- expect(compressedFile.delete()).andReturn(true);
- expect(fileToUpload.delete()).andReturn(true);
- expect(compressedFile.getAbsolutePath()).andReturn(TEST_BUCKET + "/" + LOG_TYPE + "/" +fileName);
- replay(fileToUpload, compressedFile);
-
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, true, LOG_TYPE) {
- @Override
- protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- return compressedFile;
- }
- @Override
- protected void writeFileIntoS3File(File sourceFile, String bucketName, String s3Path, String s3Endpoint, String s3AccessKey, String s3SecretKey) {
- }
- };
- s3Uploader.uploadFile(fileToUpload, LOG_TYPE);
- }
-
- private Map<String, Object> setupS3Configs() {
- Map<String, Object> configs = new HashMap<>();
- configs.put(S3OutputConfiguration.S3_BUCKET_NAME_KEY, TEST_BUCKET);
- configs.put(S3OutputConfiguration.S3_LOG_DIR_KEY, TEST_PATH);
- configs.put(S3OutputConfiguration.S3_ACCESS_KEY, ACCESS_KEY_VALUE);
- configs.put(S3OutputConfiguration.S3_SECRET_KEY, SECRET_KEY_VALUE);
- configs.put(S3OutputConfiguration.COMPRESSION_ALGO_KEY, GZ);
- Map<String, String> nameValueMap = new HashMap<>();
- nameValueMap.put(S3OutputConfiguration.CLUSTER_KEY, "cl1");
- configs.put(S3OutputConfiguration.ADDITIONAL_FIELDS_KEY, nameValueMap);
- return configs;
- }
-}
diff --git a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java b/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
deleted file mode 100644
index 2cfe9ff..0000000
--- a/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerTest.java
+++ /dev/null
@@ -1,374 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output.spool;
-
-import org.easymock.EasyMockRule;
-import org.easymock.LogicalOperator;
-import org.easymock.Mock;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Comparator;
-
-import static org.easymock.EasyMock.*;
-
-public class LogSpoolerTest {
-
- @Rule
- public TemporaryFolder testFolder = new TemporaryFolder();
-
- @Rule
- public EasyMockRule mocks = new EasyMockRule(this);
-
- private String spoolDirectory;
- private static final String SOURCE_FILENAME_PREFIX = "hdfs-namenode.log";
-
- @Mock
- private RolloverCondition rolloverCondition;
-
- @Mock
- private RolloverHandler rolloverHandler;
-
- @Before
- public void setup() {
- spoolDirectory = testFolder.getRoot().getAbsolutePath();
- }
-
- @Test
- public void shouldSpoolEventToFile() {
- final PrintWriter spoolWriter = mock(PrintWriter.class);
- spoolWriter.println("log event");
-
- final File mockFile = setupInputFileExpectations();
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
- andReturn(false);
-
- replay(spoolWriter, rolloverCondition, mockFile);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- return mockFile;
- }
- };
- logSpooler.add("log event");
-
- verify(spoolWriter);
- }
-
- private File setupInputFileExpectations() {
- final File mockFile = mock(File.class);
- expect(mockFile.length()).andReturn(10240L);
- return mockFile;
- }
-
- @Test
- public void shouldIncrementSpooledEventsCount() {
-
- final PrintWriter spoolWriter = mock(PrintWriter.class);
- spoolWriter.println("log event");
-
- final File mockFile = setupInputFileExpectations();
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
- logSpoolerContext.logEventSpooled();
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext, new LogSpoolerEventCountComparator(), LogicalOperator.EQUAL))).
- andReturn(false);
-
- replay(spoolWriter, rolloverCondition, mockFile);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- return mockFile;
- }
- };
- logSpooler.add("log event");
-
- verify(rolloverCondition);
- }
-
- @Test
- public void shouldCloseCurrentSpoolFileOnRollOver() {
- final PrintWriter spoolWriter = mock(PrintWriter.class);
- spoolWriter.println("log event");
- spoolWriter.flush();
- spoolWriter.close();
-
- final File mockFile = setupInputFileExpectations();
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
- andReturn(true);
- rolloverHandler.handleRollover(mockFile);
-
- replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
-
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- return mockFile;
- }
- };
- logSpooler.add("log event");
-
- verify(spoolWriter);
- }
-
- @Test
- public void shouldReinitializeFileOnRollover() {
- final PrintWriter spoolWriter1 = mock(PrintWriter.class);
- final PrintWriter spoolWriter2 = mock(PrintWriter.class);
- spoolWriter1.println("log event1");
- spoolWriter2.println("log event2");
- spoolWriter1.flush();
- spoolWriter1.close();
-
- final File mockFile1 = setupInputFileExpectations();
- final File mockFile2 = setupInputFileExpectations();
-
- LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
- ).andReturn(true);
-
- LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
- ).andReturn(false);
-
- rolloverHandler.handleRollover(mockFile1);
-
- replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
- private boolean wasRolledOver;
-
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- if (!wasRolledOver) {
- wasRolledOver = true;
- return spoolWriter1;
- } else {
- return spoolWriter2;
- }
- }
-
- @Override
- protected File initializeSpoolFile() {
- if (!wasRolledOver) {
- return mockFile1;
- } else {
- return mockFile2;
- }
- }
- };
- logSpooler.add("log event1");
- logSpooler.add("log event2");
-
- verify(spoolWriter1, spoolWriter2, rolloverCondition);
- }
-
- @Test
- public void shouldCallRolloverHandlerOnRollover() {
- final PrintWriter spoolWriter = mock(PrintWriter.class);
- spoolWriter.println("log event");
- spoolWriter.flush();
- spoolWriter.close();
-
- final File mockFile = setupInputFileExpectations();
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
- ).andReturn(true);
- rolloverHandler.handleRollover(mockFile);
-
- replay(spoolWriter, rolloverCondition, rolloverHandler, mockFile);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
-
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- return mockFile;
- }
- };
- logSpooler.add("log event");
-
- verify(rolloverHandler);
- }
-
- // Rollover twice - the second rollover should work if the "rolloverInProgress"
- // flag is being reset correctly. Third file expectations being setup due
- // to auto-initialization.
- @Test
- public void shouldResetRolloverInProgressFlag() {
- final PrintWriter spoolWriter1 = mock(PrintWriter.class);
- final PrintWriter spoolWriter2 = mock(PrintWriter.class);
- final PrintWriter spoolWriter3 = mock(PrintWriter.class);
- spoolWriter1.println("log event1");
- spoolWriter2.println("log event2");
- spoolWriter1.flush();
- spoolWriter1.close();
- spoolWriter2.flush();
- spoolWriter2.close();
-
- final File mockFile1 = setupInputFileExpectations();
- final File mockFile2 = setupInputFileExpectations();
- final File mockFile3 = setupInputFileExpectations();
-
- LogSpoolerContext logSpoolerContext1 = new LogSpoolerContext(mockFile1);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext1, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
- ).andReturn(true);
-
- LogSpoolerContext logSpoolerContext2 = new LogSpoolerContext(mockFile2);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext2, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))
- ).andReturn(true);
-
- rolloverHandler.handleRollover(mockFile1);
- rolloverHandler.handleRollover(mockFile2);
-
- replay(spoolWriter1, spoolWriter2, rolloverCondition, rolloverHandler, mockFile1, mockFile2, mockFile3);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
- private int currentFileNum;
-
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- PrintWriter spoolWriter = null;
- switch (currentFileNum) {
- case 0:
- spoolWriter = spoolWriter1;
- break;
- case 1:
- spoolWriter = spoolWriter2;
- break;
- case 2:
- spoolWriter = spoolWriter3;
- break;
- }
- currentFileNum++;
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- switch (currentFileNum) {
- case 0:
- return mockFile1;
- case 1:
- return mockFile2;
- case 2:
- return mockFile3;
- default:
- return null;
- }
- }
- };
- logSpooler.add("log event1");
- logSpooler.add("log event2");
-
- verify(spoolWriter1, spoolWriter2, rolloverCondition);
- }
-
- @Test
- public void shouldNotRolloverZeroLengthFiles() {
- final PrintWriter spoolWriter = mock(PrintWriter.class);
- spoolWriter.println("log event");
- spoolWriter.flush();
- spoolWriter.close();
-
- final File mockFile = mock(File.class);
- expect(mockFile.length()).andReturn(0L);
-
- LogSpoolerContext logSpoolerContext = new LogSpoolerContext(mockFile);
- expect(rolloverCondition.shouldRollover(
- cmp(logSpoolerContext, new LogSpoolerFileComparator(), LogicalOperator.EQUAL))).
- andReturn(true);
-
- replay(spoolWriter, rolloverCondition, mockFile);
-
- LogSpooler logSpooler = new LogSpooler(spoolDirectory, SOURCE_FILENAME_PREFIX,
- rolloverCondition, rolloverHandler) {
-
- @Override
- protected PrintWriter initializeSpoolWriter(File spoolFile) throws IOException {
- return spoolWriter;
- }
-
- @Override
- protected File initializeSpoolFile() {
- return mockFile;
- }
- };
- logSpooler.add("log event");
-
- verify(mockFile);
- }
-
- class LogSpoolerFileComparator implements Comparator<LogSpoolerContext> {
- @Override
- public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
- return o1.getActiveSpoolFile()==o2.getActiveSpoolFile() ? 0 : -1;
- }
- }
-
- class LogSpoolerEventCountComparator implements Comparator<LogSpoolerContext> {
- @Override
- public int compare(LogSpoolerContext o1, LogSpoolerContext o2) {
- return (int)(o1.getNumEventsSpooled()-o2.getNumEventsSpooled());
- }
- }
-
-}
diff --git a/ambari-logsearch-server/pom.xml b/ambari-logsearch-server/pom.xml
index 3349482..3857f35 100755
--- a/ambari-logsearch-server/pom.xml
+++ b/ambari-logsearch-server/pom.xml
@@ -442,7 +442,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
- <version>3.0.0</version>
+ <version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
diff --git a/docker/cloud-docker-compose.yml b/docker/cloud-docker-compose.yml
index 3a9ec05..ae0c8b3 100644
--- a/docker/cloud-docker-compose.yml
+++ b/docker/cloud-docker-compose.yml
@@ -120,6 +120,9 @@ services:
image: flokkr/hadoop-hdfs-datanode:${HADOOP_VERSION:-3.0.0}
links:
- namenode
+ ports:
+ - 61890:61890
+ - 5007:5007
env_file:
- Profile
networks:
diff --git a/docker/test-config/logfeeder/logfeeder.properties b/docker/test-config/logfeeder/logfeeder.properties
index ffdb061..aed8337 100644
--- a/docker/test-config/logfeeder/logfeeder.properties
+++ b/docker/test-config/logfeeder/logfeeder.properties
@@ -35,4 +35,15 @@ logfeeder.solr.core.config.name=history
#logfeeder.configs.local.enabled=true
#logfeeder.configs.filter.solr.enabled=true
#logfeeder.configs.filter.zk.enabled=true
-#logfeeder.cloud.storage.mode=hybrid
\ No newline at end of file
+logfeeder.cloud.storage.mode=default
+logfeeder.cloud.storage.destination=s3
+logfeeder.cloud.rollover.threshold.min=1000
+logfeeder.cloud.rollover.threshold.size=1K
+
+logfeeder.s3.endpoint=http://fakes3:4569
+logfeeder.s3.secret.key=MySecretKey
+logfeeder.s3.access.key=MyAccessKey
+logfeeder.s3.bucket=logfeeder
+logfeeder.s3.object.acl=public-read
+logfeeder.s3.path.style.access=true
+logfeeder.s3.multiobject.delete.enable=false
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index a5e4ef2..91e5b09 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,7 +84,7 @@
<deb.architecture>amd64</deb.architecture>
<deb.dependency.list>${deb.python.ver}</deb.dependency.list>
<solr.version>7.5.0</solr.version>
- <hadoop.version>3.0.0</hadoop.version>
+ <hadoop.version>3.1.1</hadoop.version>
<common.io.version>2.5</common.io.version>
<zookeeper.version>3.4.6.2.3.0.0-2557</zookeeper.version>
<forkCount>4</forkCount>
@@ -95,6 +95,7 @@
<ambari-metrics.version>2.7.0.0.0</ambari-metrics.version>
<logsearch.docker.tag>latest</logsearch.docker.tag>
<fasterxml-jackson.version>2.9.5</fasterxml-jackson.version>
+ <log4j2.version>2.11.1</log4j2.version>
</properties>
<licenses>
@@ -395,17 +396,17 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
- <version>2.10.0</version>
+ <version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
- <version>2.10.0</version>
+ <version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
- <version>2.10.0</version>
+ <version>${log4j2.version}</version>
</dependency>
<dependency>
<groupId>commons-fileupload</groupId>