You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by GitBox <gi...@apache.org> on 2018/11/21 11:13:40 UTC

[GitHub] oleewere closed pull request #33: AMBARI-24833. Do not open FS on main thread.

oleewere closed pull request #33: AMBARI-24833. Do not open FS on main thread.
URL: https://github.com/apache/ambari-logsearch/pull/33
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index 12af637d8c..3f40e88a15 100644
--- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -111,7 +111,7 @@ private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, JsonPa
       } catch (Exception e) {
         final String errorMessage;
         if (tries < 3) {
-          errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+          errorMessage = String.format("Cannot parse input config: '%s', will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
           logger.error(errorMessage, e);
           try {
             Thread.sleep(2000);
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index a15ac7468a..e5a6e3800b 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -108,6 +108,7 @@
   public static final String CLOUD_STORAGE_MODE = "logfeeder.cloud.storage.mode";
   public static final String CLOUD_STORAGE_DESTINATION = "logfeeder.cloud.storage.destination";
   public static final String CLOUD_STORAGE_UPLOAD_ON_SHUTDOWN = "logfeeder.cloud.storage.upload.on.shutdown";
+  public static final String CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES = "logfeeder.cloud.storage.uploader.timeout.minutes";
   public static final String CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS = "logfeeder.cloud.storage.uploader.interval.seconds";
   public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
   public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index b6ab4c7342..9ed4c9b949 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -251,6 +251,16 @@
   @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_INTERVAL_SECONDS + ":60}")
   private Integer cloudStorageUploaderIntervalSeconds;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES,
+    description = "Timeout value for uploading task to cloud storage in minutes.",
+    examples = {"10"},
+    defaultValue = "60",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_UPLOADER_TIMEOUT_MINUTUES + ":60}")
+  private Integer cloudStorageUploaderTimeoutMinutes;
+
   @LogSearchPropertyDescription(
     name = LogFeederConstants.CLOUD_STORAGE_USE_HDFS_CLIENT,
     description = "Use hdfs client with cloud connectors instead of the core clients for shipping data to cloud storage",
@@ -499,6 +509,14 @@ public void setCloudStorageUploaderIntervalSeconds(Integer cloudStorageUploaderI
     this.cloudStorageUploaderIntervalSeconds = cloudStorageUploaderIntervalSeconds;
   }
 
+  public Integer getCloudStorageUploaderTimeoutMinutes() {
+    return cloudStorageUploaderTimeoutMinutes;
+  }
+
+  public void setCloudStorageUploaderTimeoutMinutes(Integer cloudStorageUploaderTimeoutMinutes) {
+    this.cloudStorageUploaderTimeoutMinutes = cloudStorageUploaderTimeoutMinutes;
+  }
+
   public boolean isUseCloudHdfsClient() {
     return useCloudHdfsClient;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
index fbbffe6ada..4240fb11e7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutput.java
@@ -129,7 +129,7 @@ void stopUploader() {
     uploader.interrupt();
     if (logFeederProps.isCloudStorageUploadOnShutdown()) {
       logger.info("Do last upload before shutdown.");
-      uploader.doUpload();
+      uploader.doUpload(2); // hard-coded 2 minutes timeout on shutdown
     }
   }
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index 22c7fc1c61..cb4cac3a63 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -28,6 +28,10 @@
 import java.io.File;
 import java.nio.file.Paths;
 import java.util.Collection;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Periodically checks a folder (contains archived logs) and if it finds any .log or .gz files, it will try to upload them to cloud storage by an upload client (cloud specific)
@@ -41,6 +45,7 @@
   private final String clusterName;
   private final String hostName;
   private final String uploaderType;
+  private final ExecutorService executorService;
 
   public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederProps logFeederProps) {
     super(name);
@@ -49,6 +54,7 @@ public CloudStorageUploader(String name, UploadClient uploadClient, LogFeederPro
     this.uploaderType = logFeederProps.getCloudStorageDestination().getText();
     this.clusterName = logFeederProps.getClusterName();
     this.hostName = LogFeederUtil.hostName;
+    this.executorService = Executors.newSingleThreadExecutor();
   }
 
   @Override
@@ -58,7 +64,7 @@ public void run() {
     do {
       try {
         try {
-          doUpload();
+          doUpload(logFeederProps.getCloudStorageUploaderTimeoutMinutes());
         } catch (Exception e) {
           logger.error("An error occurred during Uploader operation - " + uploaderType, e);
         }
@@ -73,7 +79,7 @@ public void run() {
   /**
    * Finds .log and .gz files and upload them to cloud storage by an uploader client
    */
-  void doUpload() {
+  void doUpload(int timeout) {
     try {
       final File archiveLogDir = Paths.get(logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(),
         uploaderType, clusterName, hostName, "archived").toFile();
@@ -88,7 +94,16 @@ void doUpload() {
             String outputPath = String.format("%s/%s/%s/%s/%s", basePath, clusterName, hostName, file.getParentFile().getName(), file.getName())
               .replaceAll("//", "/");
             logger.info("Upload will start: input: {}, output: {}", file.getAbsolutePath(), outputPath);
-            uploadClient.upload(file.getAbsolutePath(), outputPath);
+            Future<?> future = executorService.submit(() -> {
+              try {
+                uploadClient.upload(file.getAbsolutePath(), outputPath);
+              } catch (InterruptedException ie) {
+                logger.error("Cloud upload thread interrupted", ie);
+              } catch (Exception e) {
+                logger.error("Exception during cloud upload", e);
+              }
+            });
+            future.get(timeout, TimeUnit.MINUTES);
           }
         }
       } else {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
index 7e1b471de0..3d5ec8f606 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/upload/HDFSUploadClient.java
@@ -30,6 +30,8 @@
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.concurrent.atomic.AtomicReference;
+
 /**
  * HDFS client that uses core-site.xml file from the classpath to load the configuration.
  * Can connect to S3 / GCS / WASB / ADLS if the core-site.xml is configured to use one of those cloud storages
@@ -44,7 +46,7 @@
   private final boolean externalHdfs;
   private final HdfsOutputConfig hdfsOutputConfig;
   private final FsPermission fsPermission;
-  private FileSystem fs;
+  private final AtomicReference<Configuration> configurationRef = new AtomicReference<>();
 
   public HDFSUploadClient(HdfsOutputConfig hdfsOutputConfig, boolean externalHdfs) {
     this.hdfsOutputConfig = hdfsOutputConfig;
@@ -84,18 +86,18 @@ public void init(LogFeederProps logFeederProps) {
       }
     }
     logger.info("HDFS client - will use '{}' permission for uploaded files", hdfsOutputConfig.getHdfsFilePermissions());
-    LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configuration);
-    this.fs = LogFeederHDFSUtil.buildFileSystem(configuration);
+    configurationRef.set(configuration);
+    LogFeederHDFSUtil.overrideFileSystemConfigs(logFeederProps, configurationRef.get());
   }
 
   @Override
   public void upload(String source, String target) throws Exception {
+    final FileSystem fs = LogFeederHDFSUtil.buildFileSystem(configurationRef.get());
     LogFeederHDFSUtil.copyFromLocal(source, target, fs, true, true, this.fsPermission);
   }
 
   @Override
   public void close() {
-    LogFeederHDFSUtil.closeFileSystem(fs);
   }
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services