You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/21 11:13:42 UTC
[ambari-logsearch] branch master updated: AMBARI-24833. Do not open
FS on main thread. (#33)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
The following commit(s) were added to refs/heads/master by this push:
new 4e3940a AMBARI-24833. Do not open FS on main thread. (#33)
4e3940a is described below
commit 4e3940a647f170dd6fd9f9c3027ce301dde5dee4
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Wed Nov 21 12:13:38 2018 +0100
AMBARI-24833. Do not open FS on main thread. (#33)
* AMBARI-24833. Do not open FS on main thread.
* AMBARI-24833. Use atomic reference for Configuration object
* AMBARI-24833. Use timeout for creating FS & upload files.
* AMBARI-24833. Fix typo
---
.../config/local/LogSearchConfigLogFeederLocal.java | 2 +-
.../ambari/logfeeder/common/LogFeederConstants.java | 1 +
.../ambari/logfeeder/conf/LogFeederProps.java | 18 ++++++++++++++++++
.../logfeeder/output/cloud/CloudStorageOutput.java | 2 +-
.../output/cloud/CloudStorageUploader.java | 21 ++++++++++++++++++---
.../output/cloud/upload/HDFSUploadClient.java | 10 ++++++----
6 files changed, 45 insertions(+), 9 deletions(-)
diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index 12af637..3f40e88 100644
--- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -111,7 +111,7 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
} catch (Exception e) {
final String errorMessage;
if (tries < 3) {
- errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+ errorMessage = String.format("Cannot parse input config: '%s', will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
logger.error(errorMessage, e);
try {
Thread.sleep(2000);
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a15ac74..e5a6e38 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -108,6 +108,7 @@ public class LogFeederConstants {
public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
+ public static final String CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES = "logfeeder.cloud.storage.uploader.timeout.minutes";
public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index b6ab4c7..9ed4c9b 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -252,6 +252,16 @@ public class LogFeederProps implements LogFeederProperties {
private Integer cloudStorageUploaderIntervalSeconds;
@LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES,
+ description = "Timeout value for uploading task to cloud storage in minutes.",
+ examples = {"10"},
+ defaultValue = "60",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES + ":60}")
+ private Integer cloudStorageUploaderTimeoutMinutes;
+
+ @LogSearchPropertyDescription(
name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
description = "Use hdfs client with cloud connectors instead of the core clients for shipping data to cloud storage",
examples = {"true"},
@@ -499,6 +509,14 @@ public class LogFeederProps implements LogFeederProperties {
this.cloudStorageUploaderIntervalSeconds = cloudStorageUploaderIntervalSeconds;
}
+ public Integer getCloudStorageUploaderTimeoutMinutes() {
+ return cloudStorageUploaderTimeoutMinutes;
+ }
+
+ public void setCloudStorageUploaderTimeoutMinutes(Integer cloudStorageUploaderTimeoutMinutes) {
+ this.cloudStorageUploaderTimeoutMinutes = cloudStorageUploaderTimeoutMinutes;
+ }
+
public boolean isUseCloudHdfsClient() {
return useCloudHdfsClient;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index fbbffe6..4240fb1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -129,7 +129,7 @@ public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
uploader.interrupt();
if (logFeederProps.isCloudStorageUploadOnShutdown()) {
logger.info("Do last upload before shutdown.");
- uploader.doUpload();
+ uploader.doUpload(2); // hard-coded 2 minutes timeout on shutdown
}
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index 22c7fc1..cb4cac3 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -28,6 +28,10 @@ import org.apache.logging.log4j.Logger;
import java.io.File;
import java.nio.file.Paths;
import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
/**
* Periodically checks a folder (contains archived logs) and if it finds any .log or .gz files, it will try to upload them to cloud storage by an upload client (cloud specific)
@@ -41,6 +45,7 @@ public class CloudStorageUploader extends Thread {
private final String clusterName;
private final String hostName;
private final String uploaderType;
+ private final ExecutorService executorService;
public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederProps logFeederProps) {
super(name);
@@ -49,6 +54,7 @@ public class CloudStorageUploader extends Thread {
this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
this.clusterName = logFeederProps.getClusterName();
this.hostName = LogFeederUtil.hostName;
+ this.executorService = Executors.newSingleThreadExecutor();
}
@Override
@@ -58,7 +64,7 @@ public class CloudStorageUploader extends Thread {
do {
try {
try {
- doUpload();
+ doUpload(logFeederProps.getCloudStorageUploaderTimeoutMinutes());
} catch (Exception e) {
logger.error("An error occurred during Uploader operation - " + uploaderType, e);
}
@@ -73,7 +79,7 @@ public class CloudStorageUploader extends Thread {
/**
* Finds .log and .gz files and upload them to cloud storage by an uploader client
*/
- void doUpload() {
+ void doUpload(int timeout) {
try {
final File archiveLogDir = Paths.get(logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(),
uploaderType, clusterName, hostName, "archived").toFile();
@@ -88,7 +94,16 @@ public class CloudStorageUploader extends Thread {
String outputPath = String.format("%s/%s/%s/%s/%s", basePath, clusterName, hostName, file.getParentFile().getName(), file.getName())
.replaceAll("//", "/");
logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
- uploadClient.upload(file.getAbsolutePath(), outputPath);
+ Future<?> future = executorService.submit(() -> {
+ try {
+ uploadClient.upload(file.getAbsolutePath(), outputPath);
+ } catch (InterruptedException ie) {
+ logger.error("Cloud upload thread interrupted", ie);
+ } catch (Exception e) {
+ logger.error("Exception during cloud upload", e);
+ }
+ });
+ future.get(timeout, TimeUnit.MINUTES);
}
}
} else {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index 7e1b471..3d5ec8f 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* HDFS client that uses core-site.xml file from the classpath to load the configuration.
* Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -44,7 +46,7 @@ public class HDFSUploadClient implements UploadClient {
private final boolean externalHdfs;
private final HdfsOutputConfig hdfsOutputConfig;
private final FsPermission fsPermission;
- private FileSystem fs;
+ private final AtomicReference<Configuration> configurationRef = new AtomicReference<>();
public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
this.hdfsOutputConfig = hdfsOutputConfig;
@@ -84,18 +86,18 @@ public class HDFSUploadClient implements UploadClient {
}
}
logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
- LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configuration);
- this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
+ configurationRef.set(configuration);
+ LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configurationRef.get());
}
@Override
public void upload(String source, String target) throws Exception {
+ final FileSystem fs = LogFeederHDFSUtil.buildFileSystem(configurationRef.get());
LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
}
@Override
public void close() {
- LogFeederHDFSUtil.closeFileSystem(fs);
}
}