You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/21 11:13:42 UTC

[ambari-logsearch] branch master updated: AMBARI-24833. Do not open FS on main thread. (#33)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e3940a  AMBARI-24833. Do not open FS on main thread. (#33)
4e3940a is described below

commit 4e3940a647f170dd6fd9f9c3027ce301dde5dee4
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Wed Nov 21 12:13:38 2018 +0100

    AMBARI-24833. Do not open FS on main thread. (#33)
    
    * AMBARI-24833. Do not open FS on main thread.
    
    * AMBARI-24833. Use atomic reference for Configuration object
    
    * AMBARI-24833. Use timeout for creating FS & upload files.
    
    * AMBARI-24833. Fix typo
---
 .../config/local/LogSearchConfigLogFeederLocal.java |  2 +-
 .../ambari/logfeeder/common/LogFeederConstants.java |  1 +
 .../ambari/logfeeder/conf/LogFeederProps.java       | 18 ++++++++++++++++++
 .../logfeeder/output/cloud/CloudStorageOutput.java  |  2 +-
 .../output/cloud/CloudStorageUploader.java          | 21 ++++++++++++++++++---
 .../output/cloud/upload/HDFSUploadClient.java       | 10 ++++++----
 6 files changed, 45 insertions(+), 9 deletions(-)

diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index 12af637..3f40e88 100644
--- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -111,7 +111,7 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
       } catch (Exception e) {
         final String errorMessage;
         if (tries < 3) {
-          errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+          errorMessage = String.format("Cannot parse input config: '%s', will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
           logger.error(errorMessage, e);
           try {
             Thread.sleep(2000);
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a15ac74..e5a6e38 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -108,6 +108,7 @@ public class LogFeederConstants {
   public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
   public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
   public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
+  public static final String CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES = "logfeeder.cloud.storage.uploader.timeout.minutes";
   public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
   public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
   public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index b6ab4c7..9ed4c9b 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -252,6 +252,16 @@ public class LogFeederProps implements LogFeederProperties {
   private Integer cloudStorageUploaderIntervalSeconds;
 
   @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES,
+    description = "Timeout value for uploading task to cloud storage in minutes.",
+    examples = {"10"},
+    defaultValue = "60",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES + ":60}")
+  private Integer cloudStorageUploaderTimeoutMinutes;
+
+  @LogSearchPropertyDescription(
     name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
     description = "Use hdfs client with cloud connectors instead of the core clients for shipping data to cloud storage",
     examples = {"true"},
@@ -499,6 +509,14 @@ public class LogFeederProps implements LogFeederProperties {
     this.cloudStorageUploaderIntervalSeconds = cloudStorageUploaderIntervalSeconds;
   }
 
+  public Integer getCloudStorageUploaderTimeoutMinutes() {
+    return cloudStorageUploaderTimeoutMinutes;
+  }
+
+  public void setCloudStorageUploaderTimeoutMinutes(Integer cloudStorageUploaderTimeoutMinutes) {
+    this.cloudStorageUploaderTimeoutMinutes = cloudStorageUploaderTimeoutMinutes;
+  }
+
   public boolean isUseCloudHdfsClient() {
     return useCloudHdfsClient;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index fbbffe6..4240fb1 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -129,7 +129,7 @@ public class CloudStorageOutput extends Output<LogFeederProps, InputMarker> {
     uploader.interrupt();
     if (logFeederProps.isCloudStorageUploadOnShutdown()) {
       logger.info("Do last upload before shutdown.");
-      uploader.doUpload();
+      uploader.doUpload(2); // hard-coded 2 minutes timeout on shutdown
     }
   }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index 22c7fc1..cb4cac3 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -28,6 +28,10 @@ import org.apache.logging.log4j.Logger;
 import java.io.File;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Periodically checks a folder (contains archived logs) and if it finds any .log or .gz files, it will try to upload them to cloud storage by an upload client (cloud specific)
@@ -41,6 +45,7 @@ public class CloudStorageUploader extends Thread {
   private final String clusterName;
   private final String hostName;
   private final String uploaderType;
+  private final ExecutorService executorService;
 
   public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederProps logFeederProps) {
     super(name);
@@ -49,6 +54,7 @@ public class CloudStorageUploader extends Thread {
     this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
     this.clusterName = logFeederProps.getClusterName();
     this.hostName = LogFeederUtil.hostName;
+    this.executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
@@ -58,7 +64,7 @@ public class CloudStorageUploader extends Thread {
     do {
       try {
         try {
-          doUpload();
+          doUpload(logFeederProps.getCloudStorageUploaderTimeoutMinutes());
         } catch (Exception e) {
           logger.error("An error occurred during Uploader operation - " + uploaderType, e);
         }
@@ -73,7 +79,7 @@ public class CloudStorageUploader extends Thread {
   /**
    * Finds .log and .gz files and upload them to cloud storage by an uploader client
    */
-  void doUpload() {
+  void doUpload(int timeout) {
     try {
       final File archiveLogDir = Paths.get(logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(),
         uploaderType, clusterName, hostName, "archived").toFile();
@@ -88,7 +94,16 @@ public class CloudStorageUploader extends Thread {
             String outputPath = String.format("%s/%s/%s/%s/%s", basePath, clusterName, hostName, file.getParentFile().getName(), file.getName())
               .replaceAll("//", "/");
             logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
-            uploadClient.upload(file.getAbsolutePath(), outputPath);
+            Future<?> future = executorService.submit(() -> {
+              try {
+                uploadClient.upload(file.getAbsolutePath(), outputPath);
+              } catch (InterruptedException ie) {
+                logger.error("Cloud upload thread interrupted", ie);
+              } catch (Exception e) {
+                logger.error("Exception during cloud upload", e);
+              }
+            });
+            future.get(timeout, TimeUnit.MINUTES);
           }
         }
       } else {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index 7e1b471..3d5ec8f 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * HDFS client that uses core-site.xml file from the classpath to load the configuration.
  * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -44,7 +46,7 @@ public class HDFSUploadClient implements UploadClient {
   private final boolean externalHdfs;
   private final HdfsOutputConfig hdfsOutputConfig;
   private final FsPermission fsPermission;
-  private FileSystem fs;
+  private final AtomicReference<Configuration> configurationRef = new AtomicReference<>();
 
   public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
     this.hdfsOutputConfig = hdfsOutputConfig;
@@ -84,18 +86,18 @@ public class HDFSUploadClient implements UploadClient {
       }
     }
     logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
-    LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configuration);
-    this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
+    configurationRef.set(configuration);
+    LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configurationRef.get());
   }
 
   @Override
   public void upload(String source, String target) throws Exception {
+    final FileSystem fs = LogFeederHDFSUtil.buildFileSystem(configurationRef.get());
     LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
   }
 
   @Override
   public void close() {
-    LogFeederHDFSUtil.closeFileSystem(fs);
   }
 
 }