You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/11/19 16:14:11 UTC
[ambari-logsearch] 27/28: AMBARI-24833. Support for cloud logs to
using filters + JSON output (#26)
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch cloudbreak
in repository https://gitbox.apache.org/repos/asf/ambari-logsearch.git
commit f8b80a5193b7f0f71c7e0ceaa7ebb311a435d072
Author: Olivér Szabó <ol...@gmail.com>
AuthorDate: Mon Nov 19 10:32:15 2018 +0100
AMBARI-24833. Support for cloud logs to using filters + JSON output (#26)
* AMBARI-24833. Support for cloud logs to using filters + JSON output
* AMBARI-24833. Do not filter anything if filters are not enabled
* AMBARI-24833. Fix intermittent issues.
* AMBARI-24833. Edit comment
---
.../local/LogSearchConfigLogFeederLocal.java | 42 ++++++--
.../config/zookeeper/LogLevelFilterManagerZK.java | 1 +
.../logfeeder/common/LogFeederConstants.java | 1 +
.../ambari/logfeeder/conf/LogFeederProps.java | 18 ++++
...andler.java => AbstractInputConfigHandler.java} | 84 ++--------------
.../impl/CloudStorageInputConfigHandler.java | 14 ++-
.../operations/impl/DefaultInputConfigHandler.java | 62 +-----------
.../logfeeder/output/OutputLineEnricher.java | 109 +++++++++++++++++++++
.../ambari/logfeeder/output/OutputManagerImpl.java | 76 ++------------
.../output/cloud/CloudStorageLoggerFactory.java | 14 ++-
.../output/cloud/CloudStorageOutputManager.java | 27 ++++-
.../output/cloud/CloudStorageUploader.java | 2 +-
.../src/main/resources/logfeeder.properties | 1 +
13 files changed, 232 insertions(+), 219 deletions(-)
diff --git a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
index f6cb519..12af637 100644
--- a/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
+++ b/ambari-logsearch-config-local/src/main/java/org/apache/ambari/logsearch/config/local/LogSearchConfigLogFeederLocal.java
@@ -84,12 +84,7 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
File[] inputConfigFiles = new File(configDir).listFiles(inputConfigFileFilter);
if (inputConfigFiles != null) {
for (File inputConfigFile : inputConfigFiles) {
- String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath()));
- Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
- m.find();
- String serviceName = m.group(1);
- JsonElement inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
- inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+ tryLoadingInputConfig(inputConfigMonitor, parser, globalConfigNode, inputConfigFile);
}
}
final FileSystem fs = FileSystems.getDefault();
@@ -100,6 +95,41 @@ public class LogSearchConfigLogFeederLocal extends LogSearchConfigLocal implemen
executorService.submit(updater);
}
+ private void tryLoadingInputConfig(InputConfigMonitor inputConfigMonitor, JsonParser parser, JsonArray globalConfigNode, File inputConfigFile) throws Exception {
+ // note: that will try to solve a intermittent issue when the input config json is a null string (during file generation), that process will re-try to process the files a few times
+ int tries = 0;
+ while(true) {
+ tries++;
+ Matcher m = serviceNamePattern.matcher(inputConfigFile.getName());
+ m.find();
+ String inputConfig = new String(Files.readAllBytes(inputConfigFile.toPath()));
+ String serviceName = m.group(1);
+ JsonElement inputConfigJson = null;
+ logger.info("Trying to load '{}' service input config from input file '{}'", serviceName, inputConfigFile.getAbsolutePath());
+ try {
+ inputConfigJson = JsonHelper.mergeGlobalConfigWithInputConfig(parser, inputConfig, globalConfigNode);
+ } catch (Exception e) {
+ final String errorMessage;
+ if (tries < 3) {
+ errorMessage = String.format("Cannot parse input config: %s, will retry in a few seconds again (tries: %s)", inputConfig, String.valueOf(tries));
+ logger.error(errorMessage, e);
+ try {
+ Thread.sleep(2000);
+ } catch (Exception ex) {
+ // skip
+ }
+ continue;
+ } else {
+ errorMessage = String.format("Cannot parse input config: %s, after %s tries. Will skip to processing it", inputConfig, String.valueOf(tries));
+ logger.error(errorMessage, e);
+ break;
+ }
+ }
+ inputConfigMonitor.loadInputConfigs(serviceName, InputConfigGson.gson.fromJson(inputConfigJson, InputConfigImpl.class));
+ break;
+ }
+ }
+
@Override
public void close() throws IOException {
}
diff --git a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
index fd08e07..0975c39 100644
--- a/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
+++ b/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/LogLevelFilterManagerZK.java
@@ -48,6 +48,7 @@ public class LogLevelFilterManagerZK implements LogLevelFilterManager {
public LogLevelFilterManagerZK(Map<String, String> properties) throws Exception {
this.client = LogSearchConfigZKHelper.createZKClient(properties);
+ this.client.start();
this.serverCache = new TreeCache(client, "/");
this.aclList = LogSearchConfigZKHelper.getAcls(properties);
this.gson = LogSearchConfigZKHelper.createGson();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index b5fffa8..f9ef32d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -112,6 +112,7 @@ public class LogFeederConstants {
public static final String CLOUD_STORAGE_BUCKET = "logfeeder.cloud.storage.bucket";
public static final String CLOUD_STORAGE_BUCKET_BOOTSTRAP = "logfeeder.cloud.storage.bucket.bootstrap";
public static final String CLOUD_STORAGE_USE_HDFS_CLIENT = "logfeeder.cloud.storage.use.hdfs.client";
+ public static final String CLOUD_STORAGE_USE_FILTERS = "logfeeder.cloud.storage.use.filters";
public static final String CLOUD_STORAGE_CUSTOM_FS = "logfeeder.cloud.storage.custom.fs";
public static final String CLOUD_STORAGE_BASE_PATH = "logfeeder.cloud.storage.base.path";
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 83f10e4..f2eb6c7 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -289,6 +289,16 @@ public class LogFeederProps implements LogFeederProperties {
@Value("${"+ LogFeederConstants.HDFS_USER + ":}")
private String logfeederHdfsUser;
+ @LogSearchPropertyDescription(
+ name = LogFeederConstants.CLOUD_STORAGE_USE_FILTERS,
+ description = "Use filters for inputs (with filters the output format will be JSON)",
+ examples = {"true"},
+ defaultValue = "false",
+ sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
+ )
+ @Value("${" + LogFeederConstants.CLOUD_STORAGE_USE_FILTERS + ":false}")
+ private boolean cloudStorageUseFilters;
+
@Inject
private LogEntryCacheConfig logEntryCacheConfig;
@@ -522,6 +532,14 @@ public class LogFeederProps implements LogFeederProperties {
this.customFs = customFs;
}
+ public boolean isCloudStorageUseFilters() {
+ return cloudStorageUseFilters;
+ }
+
+ public void setCloudStorageUseFilters(boolean cloudStorageUseFilters) {
+ this.cloudStorageUseFilters = cloudStorageUseFilters;
+ }
+
public String getCloudBasePath() {
return cloudBasePath;
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
similarity index 53%
copy from ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
copy to ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
index 44da631..31bfd0d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/AbstractInputConfigHandler.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,93 +18,29 @@
*/
package org.apache.ambari.logfeeder.manager.operations.impl;
-import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
-import org.apache.ambari.logfeeder.input.InputSimulate;
import org.apache.ambari.logfeeder.manager.InputConfigHolder;
+import org.apache.ambari.logfeeder.manager.operations.InputConfigHandler;
import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
import org.apache.ambari.logfeeder.plugin.filter.Filter;
import org.apache.ambari.logfeeder.plugin.input.Input;
-import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
- * Holds input/filter/output operations in default Log Feeder mode.
+ * Holds common operations for input config handlers
*/
-public class DefaultInputConfigHandler implements InputConfigHandler {
-
- private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class);
-
- @Override
- public void init(InputConfigHolder inputConfigHolder) throws Exception {
- }
-
- @Override
- public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
- loadInputs(serviceName, inputConfigHolder);
- loadFilters(serviceName, inputConfigHolder);
- }
-
- @Override
- public void assignInputsToOutputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig config) {
- for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
- for (Output output : inputConfigHolder.getOutputManager().getOutputs()) {
- if (input.isOutputRequired(output)) {
- input.addOutput(output);
- }
- }
- }
+public abstract class AbstractInputConfigHandler implements InputConfigHandler {
- // In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
- for (Output output : InputSimulate.getSimulateOutputs()) {
- output.setLogSearchConfig(inputConfigHolder.getConfig());
- inputConfigHolder.getOutputManager().add(output);
- }
- }
-
- private void loadInputs(String serviceName, InputConfigHolder inputConfigHolder) {
- for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
- if (inputDescriptor == null) {
- logger.warn("Input descriptor is smpty. Skipping...");
- continue;
- }
-
- String source = inputDescriptor.getSource();
- if (StringUtils.isEmpty(source)) {
- logger.error("Input block doesn't have source element");
- continue;
- }
- Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
- if (input == null) {
- logger.error("Input object could not be found");
- continue;
- }
- input.setType(source);
- input.setLogType(inputDescriptor.getType());
- input.loadConfig(inputDescriptor);
-
- if (input.isEnabled()) {
- input.setOutputManager(inputConfigHolder.getOutputManager());
- input.setInputManager(inputConfigHolder.getInputManager());
- inputConfigHolder.getInputManager().add(serviceName, input);
- logger.info("New input object registered for service '{}': '{}'", serviceName, input.getLogType());
- input.logConfigs();
- } else {
- logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
- }
- }
- }
+ private static final Logger logger = LogManager.getLogger(AbstractInputConfigHandler.class);
- private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
+ protected void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
sortFilters(inputConfigHolder);
List<Input> toRemoveInputList = new ArrayList<>();
@@ -152,7 +88,7 @@ public class DefaultInputConfigHandler implements InputConfigHandler {
}
}
- private void sortFilters(InputConfigHolder inputConfigHolder) {
+ protected void sortFilters(InputConfigHolder inputConfigHolder) {
Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
Integer o1Sort = o1.getSortOrder();
Integer o2Sort = o2.getSortOrder();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
index deb3a91..ac10b2d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/CloudStorageInputConfigHandler.java
@@ -38,7 +38,7 @@ import java.util.List;
/**
* Holds input/filter/output operations in cloud Log Feeder mode.
*/
-public class CloudStorageInputConfigHandler implements InputConfigHandler {
+public class CloudStorageInputConfigHandler extends AbstractInputConfigHandler {
private static final Logger logger = LogManager.getLogger(CloudStorageInputConfigHandler.class);
@@ -49,6 +49,7 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
@Override
public void loadInputs(String serviceName, InputConfigHolder inputConfigHolder, InputConfig inputConfig) {
+ final boolean useFilters = inputConfigHolder.getLogFeederProps().isCloudStorageUseFilters();
for (InputDescriptor inputDescriptor : inputConfigHolder.getInputConfigList()) {
if (inputDescriptor == null) {
logger.warn("Input descriptor is smpty. Skipping...");
@@ -72,9 +73,11 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
input.setType(source);
input.setLogType(LogFeederConstants.CLOUD_PREFIX + inputDescriptor.getType());
input.loadConfig(inputDescriptor);
- FilterDummy filter = new FilterDummy();
- filter.setOutputManager(inputConfigHolder.getOutputManager());
- input.setFirstFilter(filter);
+ if (!useFilters) {
+ FilterDummy filter = new FilterDummy();
+ filter.setOutputManager(inputConfigHolder.getOutputManager());
+ input.setFirstFilter(filter);
+ }
input.setCloudInput(true);
if (input.isEnabled()) {
@@ -87,6 +90,9 @@ public class CloudStorageInputConfigHandler implements InputConfigHandler {
logger.info("Input is disabled. So ignoring it. " + input.getShortDescription());
}
}
+ if (useFilters) {
+ loadFilters(serviceName, inputConfigHolder);
+ }
}
@Override
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
index 44da631..dd0fe3e 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/manager/operations/impl/DefaultInputConfigHandler.java
@@ -40,7 +40,7 @@ import java.util.List;
/**
* Holds input/filter/output operations in default Log Feeder mode.
*/
-public class DefaultInputConfigHandler implements InputConfigHandler {
+public class DefaultInputConfigHandler extends AbstractInputConfigHandler {
private static final Logger logger = LogManager.getLogger(DefaultInputConfigHandler.class);
@@ -103,64 +103,4 @@ public class DefaultInputConfigHandler implements InputConfigHandler {
}
}
}
-
- private void loadFilters(String serviceName, InputConfigHolder inputConfigHolder) {
- sortFilters(inputConfigHolder);
-
- List<Input> toRemoveInputList = new ArrayList<>();
- for (Input input : inputConfigHolder.getInputManager().getInputList(serviceName)) {
- for (FilterDescriptor filterDescriptor : inputConfigHolder.getFilterConfigList()) {
- if (filterDescriptor == null) {
- logger.warn("Filter descriptor is smpty. Skipping...");
- continue;
- }
- if (BooleanUtils.isFalse(filterDescriptor.isEnabled())) {
- logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " because it is disabled");
- continue;
- }
- if (!input.isFilterRequired(filterDescriptor)) {
- logger.debug("Ignoring filter " + filterDescriptor.getFilter() + " for input " + input.getShortDescription());
- continue;
- }
-
- String value = filterDescriptor.getFilter();
- if (StringUtils.isEmpty(value)) {
- logger.error("Filter block doesn't have filter element");
- continue;
- }
- Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
- if (filter == null) {
- logger.error("Filter object could not be found");
- continue;
- }
- filter.loadConfig(filterDescriptor);
- filter.setInput(input);
-
- filter.setOutputManager(inputConfigHolder.getOutputManager());
- input.addFilter(filter);
- filter.logConfigs();
- }
-
- if (input.getFirstFilter() == null) {
- toRemoveInputList.add(input);
- }
- }
-
- for (Input toRemoveInput : toRemoveInputList) {
- logger.warn("There are no filters, we will ignore this input. " + toRemoveInput.getShortDescription());
- inputConfigHolder.getInputManager().removeInput(toRemoveInput);
- }
- }
-
- private void sortFilters(InputConfigHolder inputConfigHolder) {
- Collections.sort(inputConfigHolder.getFilterConfigList(), (o1, o2) -> {
- Integer o1Sort = o1.getSortOrder();
- Integer o2Sort = o2.getSortOrder();
- if (o1Sort == null || o2Sort == null) {
- return 0;
- }
-
- return o1Sort - o2Sort;
- });
- }
}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
new file mode 100644
index 0000000..bd9e3df
--- /dev/null
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineEnricher.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import com.google.common.hash.Hashing;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility class for fill output with other fields
+ */
+public class OutputLineEnricher {
+
+ private static final Logger logger = LogManager.getLogger(OutputLineEnricher.class);
+
+ private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+ public void enrichFields(final Map<String, Object> jsonObj, final InputMarker inputMarker, final MetricData messageTruncateMetric) {
+ Input input = inputMarker.getInput();
+ // Update the block with the context fields
+ for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
+ if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
+ jsonObj.put(entry.getKey(), entry.getValue());
+ }
+ }
+ // TODO: Ideally most of the overrides should be configurable
+ LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
+ if (input.isUseEventMD5() || input.isGenEventMD5()) {
+ String prefix = "";
+ Object logtimeObj = jsonObj.get("logtime");
+ if (logtimeObj != null) {
+ if (logtimeObj instanceof Date) {
+ prefix = "" + ((Date) logtimeObj).getTime();
+ } else {
+ prefix = logtimeObj.toString();
+ }
+ }
+ byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
+ long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
+ if (input.isGenEventMD5()) {
+ jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
+ }
+ if (input.isUseEventMD5()) {
+ jsonObj.put("id", prefix + Long.toString(eventMD5));
+ }
+ }
+ jsonObj.computeIfAbsent("event_count", k -> 1);
+ if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
+ jsonObj.put("group", input.getInputDescriptor().getGroup());
+ }
+ if (inputMarker.getAllProperties().containsKey("line_number") &&
+ (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+ jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
+ }
+ if (jsonObj.containsKey("log_message")) {
+ // TODO: Let's check size only for log_message for now
+ String logMessage = (String) jsonObj.get("log_message");
+ logMessage = truncateLongLogMessage(messageTruncateMetric, jsonObj, input, logMessage);
+ jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private String truncateLongLogMessage(MetricData messageTruncateMetric, Map<String, Object> jsonObj, Input input, String logMessage) {
+ if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+ messageTruncateMetric.value++;
+ String logMessageKey = input.getOutputManager().getClass().getSimpleName() + "_MESSAGESIZE";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
+ ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 200 characters=" +
+ StringUtils.abbreviate(logMessage, 200), null, logger, Level.WARN);
+ logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+ jsonObj.put("log_message", logMessage);
+ List<String> tagsList = (List<String>) jsonObj.get("tags");
+ if (tagsList == null) {
+ tagsList = new ArrayList<>();
+ jsonObj.put("tags", tagsList);
+ }
+ tagsList.add("error_message_truncated");
+ }
+ return logMessage;
+ }
+}
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index afe1c0a..b4c862d 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -59,7 +59,8 @@ public class OutputManagerImpl extends OutputManager {
@Inject
private LogFeederProps logFeederProps;
- private OutputLineFilter outputLineFilter = new OutputLineFilter();
+ private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher();
+ private final OutputLineFilter outputLineFilter = new OutputLineFilter();
public List<Output> getOutputs() {
return outputs;
@@ -80,57 +81,12 @@ public class OutputManagerImpl extends OutputManager {
@SuppressWarnings("unchecked")
public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
- Input input = inputMarker.getInput();
-
- // Update the block with the context fields
- for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
- if (jsonObj.get(entry.getKey()) == null || entry.getKey().equals("cluster") && "null".equals(jsonObj.get(entry.getKey()))) {
- jsonObj.put(entry.getKey(), entry.getValue());
- }
- }
-
- // TODO: Ideally most of the overrides should be configurable
-
- LogFeederUtil.fillMapWithFieldDefaults(jsonObj, inputMarker, true);
- jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
-
- if (input.isUseEventMD5() || input.isGenEventMD5()) {
- String prefix = "";
- Object logtimeObj = jsonObj.get("logtime");
- if (logtimeObj != null) {
- if (logtimeObj instanceof Date) {
- prefix = "" + ((Date) logtimeObj).getTime();
- } else {
- prefix = logtimeObj.toString();
- }
- }
-
-
- byte[] bytes = LogFeederUtil.getGson().toJson(jsonObj).getBytes();
- long eventMD5 = Hashing.md5().hashBytes(bytes).asLong();
- if (input.isGenEventMD5()) {
- jsonObj.put("event_md5", prefix + Long.toString(eventMD5));
- }
- if (input.isUseEventMD5()) {
- jsonObj.put("id", prefix + Long.toString(eventMD5));
- }
- }
-
jsonObj.put("seq_num", docCounter++);
- jsonObj.computeIfAbsent("event_count", k -> 1);
- if (StringUtils.isNotBlank(input.getInputDescriptor().getGroup())) {
- jsonObj.put("group", input.getInputDescriptor().getGroup());
- }
- if (inputMarker.getAllProperties().containsKey("line_number") &&
- (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
- jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
- }
- if (jsonObj.containsKey("log_message")) {
- // TODO: Let's check size only for log_message for now
- String logMessage = (String) jsonObj.get("log_message");
- logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
- jsonObj.put("message_md5", "" + Hashing.md5().hashBytes(logMessage.getBytes()).asLong());
+ if (docCounter == Long.MIN_VALUE) {
+ docCounter = 1;
}
+ outputLineEnricher.enrichFields(jsonObj, inputMarker, messageTruncateMetric);
+ Input input = inputMarker.getInput();
List<String> defaultLogLevels = getDefaultLogLevels(input);
if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker, defaultLogLevels)
&& !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
@@ -159,26 +115,6 @@ public class OutputManagerImpl extends OutputManager {
}
@SuppressWarnings("unchecked")
- private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) {
- if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
- messageTruncateMetric.value++;
- String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
- ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
- StringUtils.abbreviate(logMessage, 100), null, logger, Level.WARN);
- logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
- jsonObj.put("log_message", logMessage);
- List<String> tagsList = (List<String>) jsonObj.get("tags");
- if (tagsList == null) {
- tagsList = new ArrayList<String>();
- jsonObj.put("tags", tagsList);
- }
- tagsList.add("error_message_truncated");
- }
- return logMessage;
- }
-
- @SuppressWarnings("unchecked")
public void write(String jsonBlock, InputMarker inputMarker) {
List<String> defaultLogLevels = getDefaultLogLevels(inputMarker.getInput());
if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker, defaultLogLevels)) {
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
index 8201051..0cfdbcc 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageLoggerFactory.java
@@ -48,8 +48,11 @@ public class CloudStorageLoggerFactory {
private static final String ARCHIVED_FOLDER = "archived";
private static final String DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log.gz";
private static final String DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.log";
+ private static final String JSON_DATE_PATTERN_SUFFIX_GZ = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json.gz";
+ private static final String JSON_DATE_PATTERN_SUFFIX = "-%d{yyyy-MM-dd-HH-mm-ss-SSS}.json";
public static Logger createLogger(Input input, LoggerContext loggerContext, LogFeederProps logFeederProps) {
+ boolean useJsonFormat = logFeederProps.isCloudStorageUseFilters();
String type = input.getLogType().replace(LogFeederConstants.CLOUD_PREFIX, "");
String uniqueThreadName = input.getThread().getName();
Configuration config = loggerContext.getConfiguration();
@@ -59,8 +62,15 @@ public class CloudStorageLoggerFactory {
String archiveLogDir = Paths.get(baseDir, destination, ARCHIVED_FOLDER, type).toFile().getAbsolutePath();
boolean useGzip = logFeederProps.getRolloverConfig().isUseGzip();
- String archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
- String fileName = String.join(File.separator, activeLogDir, type + ".log");
+ final String archiveFilePattern;
+ if (useJsonFormat) {
+ archiveFilePattern = useGzip ? JSON_DATE_PATTERN_SUFFIX_GZ : JSON_DATE_PATTERN_SUFFIX;
+ } else {
+ archiveFilePattern = useGzip ? DATE_PATTERN_SUFFIX_GZ : DATE_PATTERN_SUFFIX;
+ }
+
+ String logSuffix = useJsonFormat ? ".json" : ".log";
+ String fileName = String.join(File.separator, activeLogDir, type + logSuffix);
String filePattern = String.join(File.separator, archiveLogDir, type + archiveFilePattern);
PatternLayout layout = PatternLayout.newBuilder()
.withPattern(PatternLayout.DEFAULT_CONVERSION_PATTERN).build();
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
index 16b7e55..9be30a0 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageOutputManager.java
@@ -18,7 +18,10 @@
*/
package org.apache.ambari.logfeeder.output.cloud;
+import org.apache.ambari.logfeeder.common.IdGeneratorHelper;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.output.OutputLineEnricher;
+import org.apache.ambari.logfeeder.output.OutputLineFilter;
import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.plugin.input.InputMarker;
@@ -33,6 +36,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Handle output operations for sending cloud inputs to a cloud storage destination
@@ -47,10 +51,25 @@ public class CloudStorageOutputManager extends OutputManager {
private CloudStorageOutput storageOutput = null;
private List<Output> outputList = new ArrayList<>();
+ private final AtomicBoolean useFilters = new AtomicBoolean(false);
+
+ private final MetricData messageTruncateMetric = new MetricData(null, false);
+ private final OutputLineEnricher outputLineEnricher = new OutputLineEnricher();
+ private final OutputLineFilter outputLineFilter = new OutputLineFilter();
@Override
public void write(Map<String, Object> jsonObj, InputMarker marker) {
- write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+ if (useFilters.get()) {
+ outputLineEnricher.enrichFields(jsonObj, marker, messageTruncateMetric);
+ if (!outputLineFilter.apply(jsonObj, marker.getInput())) {
+ if (jsonObj.get("id") == null) {
+ jsonObj.put("id", IdGeneratorHelper.generateUUID(jsonObj, storageOutput.getIdFields()));
+ }
+ write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+ }
+ } else {
+ write(LogFeederUtil.getGson().toJson(jsonObj), marker);
+ }
}
@Override
@@ -82,6 +101,12 @@ public class CloudStorageOutputManager extends OutputManager {
storageOutput = new CloudStorageOutput(logFeederProps);
storageOutput.init(logFeederProps);
add(storageOutput);
+ useFilters.set(logFeederProps.isCloudStorageUseFilters());
+ if (useFilters.get()) {
+ logger.info("Using filters are enabled for cloud log outputs");
+ } else {
+ logger.info("Using filters are disabled for cloud log outputs");
+ }
}
@Override
diff --git a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
index b76f441..af9326a 100644
--- a/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
+++ b/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/cloud/CloudStorageUploader.java
@@ -76,7 +76,7 @@ public class CloudStorageUploader extends Thread {
try {
final String archiveLogDir = String.join(File.separator, logFeederProps.getRolloverConfig().getRolloverArchiveBaseDir(), uploaderType, "archived");
if (new File(archiveLogDir).exists()) {
- String[] extensions = {"log", "gz"};
+ String[] extensions = {"log", "json", "gz"};
Collection<File> filesToUpload = FileUtils.listFiles(new File(archiveLogDir), extensions, true);
if (filesToUpload.isEmpty()) {
logger.debug("Not found any files to upload.");
diff --git a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
index c7ea335..45c05f3 100644
--- a/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
+++ b/ambari-logsearch-logfeeder/src/main/resources/logfeeder.properties
@@ -47,6 +47,7 @@ logfeeder.cloud.storage.uploader.interval.seconds=1
logfeeder.cloud.storage.upload.on.shutdown=true
logfeeder.cloud.storage.base.path=/apps/logfeeder
logfeeder.cloud.storage.use.hdfs.client=true
+logfeeder.cloud.storage.use.filters=false
logfeeder.cloud.storage.bucket=logfeeder
logfeeder.cloud.storage.bucket.bootstrap=true