You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/19 16:14:11 UTC

[ambari-logsearch] 27/28: AMBARI-24833. Support for cloud logs to using filters + JSON output (#26)

This is an automated email from the ASF dual-hosted git repository.

oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git

commit f8b80a5193b7f0f71c7e0ceaa7ebb311a435d072
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 19 10:32:15 2018 +0100

    AMBARI-24833. Support for cloud logs to using filters + JSON output (#26)
    
    * AMBARI-24833. Support for cloud logs to using filters + JSON output
    
    * AMBARI-24833. Do not filter anything if filters are not enabled
    
    * AMBARI-24833. Fix intermittent issues.
    
    * AMBARI-24833. Edit comment
---
 .../local/LogSearchConfigLogFeederLocal.java       |  42 ++++++--
 .../config/zookeeper/LogLevelFilterManagerZK.java  |   1 +
 .../logfeeder/common/LogFeederConstants.java       |   1 +
 .../ambari/logfeeder/conf/LogFeederProps.java      |  18 ++++
 ...andler.java => AbstractInputConfigHandler.java} |  84 ++--------------
 .../impl/CloudStorageInputConfigHandler.java       |  14 ++-
 .../operations/impl/DefaultInputConfigHandler.java |  62 +-----------
 .../logfeeder/output/OutputLineEnricher.java       | 109 +++++++++++++++++++++
 .../ambari/logfeeder/output/OutputManagerImpl.java |  76 ++------------
 .../output/cloud/CloudStorageLoggerFactory.java    |  14 ++-
 .../output/cloud/CloudStorageOutputManager.java    |  27 ++++-
 .../output/cloud/CloudStorageUploader.java         |   2 +-
 .../src/main/resources/logfeeder.properties        |   1 +
 13 files changed, 232 insertions(+), 219 deletions(-)

diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index f6cb519..12af637 100644
--- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -84,12 +84,7 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
     File[] inputConfigFiles = new File(configDir).listFiles(inputConfigFileFilter);
     if (inputConfigFiles != null) {
       for (File inputConfigFile : inputConfigFiles) {
-        String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath()));
-        Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
-        m.find();
-        String serviceName = m.group(1);
-        JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
-        inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+        tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, inputConfigFile);
       }
     }
     final FileSystem fs = FileSystems.getDefault();
@@ -100,6 +95,41 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
     executorService.submit(updater);
   }
 
+  private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws Exception {
+    // note: that will try to solve a intermittent issue when the input config json is a null string (during file generation), that process will re-try to process the files a few times
+    int tries = 0;
+    while(true) {
+      tries++;
+      Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+      m.find();
+      String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath()));
+      String serviceName = m.group(1);
+      JsonElement inputConfigJson = null;
+      logger.info("Trying to load '{}' service input config from input file '{}'", serviceName, inputConfigFile.getAbsolutePath());
+      try {
+        inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
+      } catch (Exception e) {
+        final String errorMessage;
+        if (tries < 3) {
+          errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          try {
+            Thread.sleep(2000);
+          } catch (Exception ex) {
+            // skip
+          }
+          continue;
+        } else {
+          errorMessage = String.format("Cannot parse input config: %s, after %s tries. Will skip to processing it", inputConfig, String.valueOf(tries));
+          logger.error(errorMessage, e);
+          break;
+        }
+      }
+      inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+      break;
+    }
+  }
+
   @Override
   public void close() throws IOException {
   }
diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index fd08e07..0975c39 100644
--- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -48,6 +48,7 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
 
   public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception {
     this.client = LogSearchConfigZKHelper.createZKClient(properties);
+    this.client.start();
     this.serverCache = new TreeCache(client, "/");
     this.aclList = LogSearchConfigZKHelper.getAcls(properties);
     this.gson = LogSearchConfigZKHelper.createGson();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index b5fffa8..f9ef32d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -112,6 +112,7 @@ public class LogFeederConstants {
   public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
   public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
   public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client";
+  public static final String CLOUD_STORAGE_USE_FILTERS = "logfeeder.cloud.storage.use.filters";
   public static final String CLOUD_STORAGE_CUSTOM_FS = "logfeeder.cloud.storage.custom.fs";
   public static final String CLOUD_STORAGE_BASE_PATH = "logfeeder.cloud.storage.base.path";
 
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 83f10e4..f2eb6c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -289,6 +289,16 @@ public class LogFeederProps implements LogFeederProperties {
   @Value("${"+ LogFeederConstants.HDFS_USER + ":}")
   private String logfeederHdfsUser;
 
+  @LogSearchPropertyDescription(
+    name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
+    description = "Use filters for inputs (with filters the output format will be JSON)",
+    examples = {"true"},
+    defaultValue = "false",
+    sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+  )
+  @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}")
+  private boolean cloudStorageUseFilters;
+
   @Inject
   private LogEntryCacheConfig logEntryCacheConfig;
 
@@ -522,6 +532,14 @@ public class LogFeederProps implements LogFeederProperties {
     this.customFs = customFs;
   }
 
+  public boolean isCloudStorageUseFilters() {
+    return cloudStorageUseFilters;
+  }
+
+  public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) {
+    this.cloudStorageUseFilters = cloudStorageUseFilters;
+  }
+
   public String getCloudBasePath() {
     return cloudBasePath;
   }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
similarity index 53%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 44da631..31bfd0d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,93 +18,29 @@
  */
 package org.apache.ambari.logfeeder.manager.operations.impl;
 
-import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
-import org.apache.ambari.logfeeder.input.InputSimulate;
 import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
 import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
 import org.apache.ambari.logfeeder.plugin.filter.Filter;
 import org.apache.ambari.logfeeder.plugin.input.Input;
-import org.apache.ambari.logfeeder.plugin.output.Output;
 import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
 import org.apache.commons.lang.BooleanUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
 /**
- * Holds input/filter/output operations in default Log Feeder mode.
+ * Holds common operations for input config handlers
  */
-public class DefaultInputConfigHandler implements InputConfigHandler {
-
-  private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class);
-
-  @Override
-  public void init(InputConfigHolder inputConfigHolder) throws Exception {
-  }
-
-  @Override
-  public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
-    loadInputs(serviceName, inputConfigHolder);
-    loadFilters(serviceName, inputConfigHolder);
-  }
-
-  @Override
-  public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) {
-    for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
-      for (Output output : inputConfigHolder.getOutputManager().getOutputs()) {
-        if (input.isOutputRequired(output)) {
-          input.addOutput(output);
-        }
-      }
-    }
+public abstract class AbstractInputConfigHandler implements InputConfigHandler {
 
-    // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
-    for (Output output : InputSimulate.getSimulateOutputs()) {
-      output.setLogSearchConfig(inputConfigHolder.getConfig());
-      inputConfigHolder.getOutputManager().add(output);
-    }
-  }
-
-  private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
-    for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
-      if (inputDescriptor == null) {
-        logger.warn("Input descriptor is smpty. Skipping...");
-        continue;
-      }
-
-      String source = inputDescriptor.getSource();
-      if (StringUtils.isEmpty(source)) {
-        logger.error("Input block doesn't have source element");
-        continue;
-      }
-      Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
-      if (input == null) {
-        logger.error("Input object could not be found");
-        continue;
-      }
-      input.setType(source);
-      input.setLogType(inputDescriptor.getType());
-      input.loadConfig(inputDescriptor);
-
-      if (input.isEnabled()) {
-        input.setOutputManager(inputConfigHolder.getOutputManager());
-        input.setInputManager(inputConfigHolder.getInputManager());
-        inputConfigHolder.getInputManager().add(serviceName, input);
-        logger.info("New input object registered for service '{}': '{}'", serviceName, input.getLogType());
-        input.logConfigs();
-      } else {
-        logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
-      }
-    }
-  }
+  private static final Logger logger = LogManager.getLogger(AbstractInputConfigHandler.class);
 
-  private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
+  protected void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
     sortFilters(inputConfigHolder);
 
     List<Input> toRemoveInputList = new ArrayList<>();
@@ -152,7 +88,7 @@ public class DefaultInputConfigHandler implements InputConfigHandler {
     }
   }
 
-  private void sortFilters(InputConfigHolder inputConfigHolder) {
+  protected void sortFilters(InputConfigHolder inputConfigHolder) {
     Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
       Integer o1Sort = o1.getSortOrder();
       Integer o2Sort = o2.getSortOrder();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index deb3a91..ac10b2d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -38,7 +38,7 @@ import java.util.List;
 /**
  * Holds input/filter/output operations in cloud Log Feeder mode.
  */
-public class CloudStorageInputConfigHandler implements InputConfigHandler {
+public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler {
 
   private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class);
 
@@ -49,6 +49,7 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
 
   @Override
   public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
+    final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
     for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
       if (inputDescriptor == null) {
         logger.warn("Input descriptor is smpty. Skipping...");
@@ -72,9 +73,11 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
       input.setType(source);
       input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType());
       input.loadConfig(inputDescriptor);
-      FilterDummy filter = new FilterDummy();
-      filter.setOutputManager(inputConfigHolder.getOutputManager());
-      input.setFirstFilter(filter);
+      if (!useFilters) {
+        FilterDummy filter = new FilterDummy();
+        filter.setOutputManager(inputConfigHolder.getOutputManager());
+        input.setFirstFilter(filter);
+      }
       input.setCloudInput(true);
 
       if (input.isEnabled()) {
@@ -87,6 +90,9 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
         logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
       }
     }
+    if (useFilters) {
+      loadFilters(serviceName, inputConfigHolder);
+    }
   }
 
   @Override
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index 44da631..dd0fe3e 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -40,7 +40,7 @@ import java.util.List;
 /**
  * Holds input/filter/output operations in default Log Feeder mode.
  */
-public class DefaultInputConfigHandler implements InputConfigHandler {
+public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
 
   private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class);
 
@@ -103,64 +103,4 @@ public class DefaultInputConfigHandler implements InputConfigHandler {
       }
     }
   }
-
-  private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
-    sortFilters(inputConfigHolder);
-
-    List<Input> toRemoveInputList = new ArrayList<>();
-    for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
-      for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
-        if (filterDescriptor == null) {
-          logger.warn("Filter descriptor is smpty. Skipping...");
-          continue;
-        }
-        if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
-          continue;
-        }
-        if (!input.isFilterRequired(filterDescriptor)) {
-          logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
-          continue;
-        }
-
-        String value = filterDescriptor.getFilter();
-        if (StringUtils.isEmpty(value)) {
-          logger.error("Filter block doesn't have filter element");
-          continue;
-        }
-        Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
-        if (filter == null) {
-          logger.error("Filter object could not be found");
-          continue;
-        }
-        filter.loadConfig(filterDescriptor);
-        filter.setInput(input);
-
-        filter.setOutputManager(inputConfigHolder.getOutputManager());
-        input.addFilter(filter);
-        filter.logConfigs();
-      }
-
-      if (input.getFirstFilter() == null) {
-        toRemoveInputList.add(input);
-      }
-    }
-
-    for (Input toRemoveInput : toRemoveInputList) {
-      logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
-      inputConfigHolder.getInputManager().removeInput(toRemoveInput);
-    }
-  }
-
-  private void sortFilters(InputConfigHolder inputConfigHolder) {
-    Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
-      Integer o1Sort = o1.getSortOrder();
-      Integer o2Sort = o2.getSortOrder();
-      if (o1Sort == null || o2Sort == null) {
-        return 0;
-      }
-
-      return o1Sort - o2Sort;
-    });
-  }
 }
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
new file mode 100644
index 0000000..bd9e3df
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for fill output with other fields
+ */
+public class OutputLineEnricher {
+
+  private static final Logger logger = LogManager.getLogger(OutputLineEnricher.class);
+
+  private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+  public void enrichFields(final Map<String, Object> jsonObj, final InputMarker inputMarker, final MetricData messageTruncateMetric) {
+    Input input = inputMarker.getInput();
+    // Update the block with the context fields
+    for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
+      if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+        jsonObj.put(entry.getKey(), entry.getValue());
+      }
+    }
+    // TODO: Ideally most of the overrides should be configurable
+    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
+    if (input.isUseEventMD5() || input.isGenEventMD5()) {
+      String prefix = "";
+      Object logtimeObj = jsonObj.get("logtime");
+      if (logtimeObj != null) {
+        if (logtimeObj instanceof Date) {
+          prefix = "" + ((Date) logtimeObj).getTime();
+        } else {
+          prefix = logtimeObj.toString();
+        }
+      }
+      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
+      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
+      if (input.isGenEventMD5()) {
+        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
+      }
+      if (input.isUseEventMD5()) {
+        jsonObj.put("id", prefix + Long.toString(eventMD5));
+      }
+    }
+    jsonObj.computeIfAbsent("event_count", k -> 1);
+    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
+      jsonObj.put("group", input.getInputDescriptor().getGroup());
+    }
+    if (inputMarker.getAllProperties().containsKey("line_number") &&
+      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+      jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
+    }
+    if (jsonObj.containsKey("log_message")) {
+      // TODO: Let's check size only for log_message for now
+      String logMessage = (String) jsonObj.get("log_message");
+      logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, input, logMessage);
+      jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private String truncateLongLogMessage(MetricData messageTruncateMetric, Map<String, Object> jsonObj, Input input, String logMessage) {
+    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+      messageTruncateMetric.value++;
+      String logMessageKey = input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE";
+      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
+        ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 200 characters=" +
+        StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN);
+      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+      jsonObj.put("log_message", logMessage);
+      List<String> tagsList = (List<String>) jsonObj.get("tags");
+      if (tagsList == null) {
+        tagsList = new ArrayList<>();
+        jsonObj.put("tags", tagsList);
+      }
+      tagsList.add("error_message_truncated");
+    }
+    return logMessage;
+  }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index afe1c0a..b4c862d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -59,7 +59,8 @@ public class OutputManagerImpl extends OutputManager {
   @Inject
   private LogFeederProps logFeederProps;
 
-  private OutputLineFilter outputLineFilter = new OutputLineFilter();
+  private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   public List<Output> getOutputs() {
     return outputs;
@@ -80,57 +81,12 @@ public class OutputManagerImpl extends OutputManager {
 
   @SuppressWarnings("unchecked")
   public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
-    Input input = inputMarker.getInput();
-
-    // Update the block with the context fields
-    for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
-      if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
-        jsonObj.put(entry.getKey(), entry.getValue());
-      }
-    }
-
-    // TODO: Ideally most of the overrides should be configurable
-
-    LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
-    jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-
-    if (input.isUseEventMD5() || input.isGenEventMD5()) {
-      String prefix = "";
-      Object logtimeObj = jsonObj.get("logtime");
-      if (logtimeObj != null) {
-        if (logtimeObj instanceof Date) {
-          prefix = "" + ((Date) logtimeObj).getTime();
-        } else {
-          prefix = logtimeObj.toString();
-        }
-      }
-
-
-      byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
-      long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
-      if (input.isGenEventMD5()) {
-        jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
-      }
-      if (input.isUseEventMD5()) {
-        jsonObj.put("id", prefix + Long.toString(eventMD5));
-      }
-    }
-
     jsonObj.put("seq_num", docCounter++);
-    jsonObj.computeIfAbsent("event_count", k -> 1);
-    if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
-      jsonObj.put("group", input.getInputDescriptor().getGroup());
-    }
-    if (inputMarker.getAllProperties().containsKey("line_number") &&
-      (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
-      jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
-    }
-    if (jsonObj.containsKey("log_message")) {
-      // TODO: Let's check size only for log_message for now
-      String logMessage = (String) jsonObj.get("log_message");
-      logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
-      jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+    if (docCounter == Long.MIN_VALUE) {
+      docCounter = 1;
     }
+    outputLineEnricher.enrichFields(jsonObj, inputMarker, messageTruncateMetric);
+    Input input = inputMarker.getInput();
     List<String> defaultLogLevels = getDefaultLogLevels(input);
     if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels)
       && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
@@ -159,26 +115,6 @@ public class OutputManagerImpl extends OutputManager {
   }
 
   @SuppressWarnings("unchecked")
-  private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) {
-    if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
-      messageTruncateMetric.value++;
-      String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
-      LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
-        ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
-        StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN);
-      logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
-      jsonObj.put("log_message", logMessage);
-      List<String> tagsList = (List<String>) jsonObj.get("tags");
-      if (tagsList == null) {
-        tagsList = new ArrayList<String>();
-        jsonObj.put("tags", tagsList);
-      }
-      tagsList.add("error_message_truncated");
-    }
-    return logMessage;
-  }
-
-  @SuppressWarnings("unchecked")
   public void write(String jsonBlock, InputMarker inputMarker) {
     List<String> defaultLogLevels = getDefaultLogLevels(inputMarker.getInput());
     if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker, defaultLogLevels)) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
index 8201051..0cfdbcc 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -48,8 +48,11 @@ public class CloudStorageLoggerFactory {
   private static final String ARCHIVED_FOLDER = "archived";
   private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz";
   private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log";
+  private static final String JSON_DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz";
+  private static final String JSON_DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json";
 
   public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) {
+    boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters();
     String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, "");
     String uniqueThreadName = input.getThread().getName();
     Configuration config = loggerContext.getConfiguration();
@@ -59,8 +62,15 @@ public class CloudStorageLoggerFactory {
     String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
 
     boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
-    String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
-    String fileName = String.join(File.separator, activeLogDir, type + ".log");
+    final String archiveFilePattern;
+    if (useJsonFormat) {
+      archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : JSON_DATE_PATTERN_SUFFIX;
+    } else {
+      archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
+    }
+
+    String logSuffix = useJsonFormat ? ".json" : ".log";
+    String fileName = String.join(File.separator, activeLogDir, type + logSuffix);
     String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern);
     PatternLayout layout = PatternLayout.newBuilder()
       .withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 16b7e55..9be30a0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -18,7 +18,10 @@
  */
 package org.apache.ambari.logfeeder.output.cloud;
 
+import org.apache.ambari.logfeeder.common.IdGeneratorHelper;
 import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.OutputLineEnricher;
+import org.apache.ambari.logfeeder.output.OutputLineFilter;
 import org.apache.ambari.logfeeder.plugin.common.MetricData;
 import org.apache.ambari.logfeeder.plugin.input.Input;
 import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -33,6 +36,7 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Handle output operations for sending cloud inputs to a cloud storage destination
@@ -47,10 +51,25 @@ public class CloudStorageOutputManager extends OutputManager {
   private CloudStorageOutput storageOutput = null;
 
   private List<Output> outputList = new ArrayList<>();
+  private final AtomicBoolean useFilters = new AtomicBoolean(false);
+
+  private final MetricData messageTruncateMetric = new MetricData(null, false);
+  private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher();
+  private final OutputLineFilter outputLineFilter = new OutputLineFilter();
 
   @Override
   public void write(Map<String, Object> jsonObj, InputMarker marker) {
-    write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    if (useFilters.get()) {
+      outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric);
+      if (!outputLineFilter.apply(jsonObj, marker.getInput())) {
+        if (jsonObj.get("id") == null) {
+          jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, storageOutput.getIdFields()));
+        }
+        write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+      }
+    } else {
+      write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+    }
   }
 
   @Override
@@ -82,6 +101,12 @@ public class CloudStorageOutputManager extends OutputManager {
     storageOutput = new CloudStorageOutput(logFeederProps);
     storageOutput.init(logFeederProps);
     add(storageOutput);
+    useFilters.set(logFeederProps.isCloudStorageUseFilters());
+    if (useFilters.get()) {
+      logger.info("Using filters are enabled for cloud log outputs");
+    } else {
+      logger.info("Using filters are disabled for cloud log outputs");
+    }
   }
 
   @Override
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index b76f441..af9326a 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -76,7 +76,7 @@ public class CloudStorageUploader extends Thread {
     try {
       final String archiveLogDir = String.join(File.separator, logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, "archived");
       if (new File(archiveLogDir).exists()) {
-        String[] extensions = {"log", "gz"};
+        String[] extensions = {"log", "json", "gz"};
         Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true);
         if (filesToUpload.isEmpty()) {
           logger.debug("Not found any files to upload.");
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index c7ea335..45c05f3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1
 logfeeder.cloud.storage.upload.on.shutdown=true
 logfeeder.cloud.storage.base.path=/apps/logfeeder
 logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.storage.use.filters=false
 
 logfeeder.cloud.storage.bucket=logfeeder
 logfeeder.cloud.storage.bucket.bootstrap=true