You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2017/04/12 12:32:06 UTC
[5/6] ambari git commit: AMBARI-20378. Logfeeder: add de-duplication
support (oleewere)
AMBARI-20378. Logfeeder: add de-duplication support (oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9bc97c4b
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9bc97c4b
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9bc97c4b
Branch: refs/heads/branch-2.5
Commit: 9bc97c4b998a51db4d2ed9d986fe318194f8a964
Parents: a65c5a0
Author: oleewere <ol...@gmail.com>
Authored: Fri Mar 10 14:45:13 2017 +0100
Committer: oleewere <ol...@gmail.com>
Committed: Wed Apr 12 14:31:27 2017 +0200
----------------------------------------------------------------------
.../logfeeder/common/LogFeederConstants.java | 2 +
.../ambari/logfeeder/filter/FilterJSON.java | 2 +
.../apache/ambari/logfeeder/input/Input.java | 63 ++++++-
.../ambari/logfeeder/input/cache/LRUCache.java | 99 +++++++++++
.../ambari/logfeeder/mapper/MapperDate.java | 4 +
.../logfeeder/output/OutputLineFilter.java | 65 ++++++++
.../ambari/logfeeder/output/OutputManager.java | 8 +-
.../ambari/logfeeder/filter/FilterJSONTest.java | 3 +
.../logfeeder/input/cache/LRUCacheTest.java | 123 ++++++++++++++
.../ambari/logfeeder/mapper/MapperDateTest.java | 3 +
.../logfeeder/output/OutputLineFilterTest.java | 167 +++++++++++++++++++
.../logfeeder/output/OutputManagerTest.java | 3 +-
.../test-config/logfeeder/logfeeder.properties | 5 +
.../shipper-conf/input.config-zookeeper.json | 5 +-
.../configuration/logfeeder-properties.xml | 56 +++++++
15 files changed, 603 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
index d1e7fba..a7559aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogFeederConstants.java
@@ -36,4 +36,6 @@ public class LogFeederConstants {
// S3 Constants
public static final String S3_PATH_START_WITH = "s3://";
public static final String S3_PATH_SEPARATOR = "/";
+
+ public static final String IN_MEMORY_TIMESTAMP = "in_memory_timestamp";
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index ba63c61..35f692e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -20,6 +20,7 @@ package org.apache.ambari.logfeeder.filter;
import java.util.Map;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
@@ -48,6 +49,7 @@ public class FilterJSON extends Filter {
if (timeStampStr != null && !timeStampStr.isEmpty()) {
String logtime = DateUtil.getDate(timeStampStr);
jsonMap.put("logtime", logtime);
+ jsonMap.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, Long.parseLong(timeStampStr));
}
super.apply(jsonMap, inputMarker);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index e13d9bd..9f54d8a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -25,12 +25,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public abstract class Input extends ConfigBlock implements Runnable {
@@ -39,7 +41,18 @@ public abstract class Input extends ConfigBlock implements Runnable {
private static final boolean DEFAULT_TAIL = true;
private static final boolean DEFAULT_USE_EVENT_MD5 = false;
private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
-
+ private static final boolean DEFAULT_CACHE_ENABLED = false;
+ private static final boolean DEFAULT_CACHE_DEDUP_LAST = false;
+ private static final int DEFAULT_CACHE_SIZE = 100;
+ private static final long DEFAULT_CACHE_DEDUP_INTERVAL = 1000;
+ private static final String DEFAULT_CACHE_KEY_FIELD = "log_message";
+
+ private static final String CACHE_ENABLED = "cache_enabled";
+ private static final String CACHE_KEY_FIELD = "cache_key_field";
+ private static final String CACHE_LAST_DEDUP_ENABLED = "cache_last_dedup_enabled";
+ private static final String CACHE_SIZE = "cache_size";
+ private static final String CACHE_DEDUP_INTERVAL = "cache_dedup_interval";
+
protected InputManager inputManager;
protected OutputManager outputManager;
private List<Output> outputList = new ArrayList<Output>();
@@ -54,6 +67,9 @@ public abstract class Input extends ConfigBlock implements Runnable {
private boolean useEventMD5;
private boolean genEventMD5;
+ private LRUCache cache;
+ private String cacheKeyField;
+
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
protected String getReadBytesMetricName() {
return null;
@@ -107,6 +123,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
@Override
public void init() throws Exception {
super.init();
+ initCache();
tail = getBooleanValue("tail", DEFAULT_TAIL);
useEventMD5 = getBooleanValue("use_event_md5_as_id", DEFAULT_USE_EVENT_MD5);
genEventMD5 = getBooleanValue("gen_event_md5", DEFAULT_GEN_EVENT_MD5);
@@ -114,6 +131,7 @@ public abstract class Input extends ConfigBlock implements Runnable {
if (firstFilter != null) {
firstFilter.init();
}
+
}
boolean monitor() {
@@ -217,6 +235,33 @@ public abstract class Input extends ConfigBlock implements Runnable {
isClosed = true;
}
+ private void initCache() {
+ boolean cacheEnabled = getConfigValue(CACHE_ENABLED) != null
+ ? getBooleanValue(CACHE_ENABLED, DEFAULT_CACHE_ENABLED)
+ : LogFeederUtil.getBooleanProperty("logfeeder.cache.enabled", DEFAULT_CACHE_ENABLED);
+ if (cacheEnabled) {
+ String cacheKeyField = getConfigValue(CACHE_KEY_FIELD) != null
+ ? getStringValue(CACHE_KEY_FIELD)
+ : LogFeederUtil.getStringProperty("logfeeder.cache.key.field", DEFAULT_CACHE_KEY_FIELD);
+
+ setCacheKeyField(getStringValue(cacheKeyField));
+
+ boolean cacheLastDedupEnabled = getConfigValue(CACHE_LAST_DEDUP_ENABLED) != null
+ ? getBooleanValue(CACHE_LAST_DEDUP_ENABLED, DEFAULT_CACHE_DEDUP_LAST)
+ : LogFeederUtil.getBooleanProperty("logfeeder.cache.last.dedup.enabled", DEFAULT_CACHE_DEDUP_LAST);
+
+ int cacheSize = getConfigValue(CACHE_SIZE) != null
+ ? getIntValue(CACHE_SIZE, DEFAULT_CACHE_SIZE)
+ : LogFeederUtil.getIntProperty("logfeeder.cache.size", DEFAULT_CACHE_SIZE);
+
+ long cacheDedupInterval = getConfigValue(CACHE_DEDUP_INTERVAL) != null
+ ? getLongValue(CACHE_DEDUP_INTERVAL, DEFAULT_CACHE_DEDUP_INTERVAL)
+ : Long.parseLong(LogFeederUtil.getStringProperty("logfeeder.cache.dedup.interval", String.valueOf(DEFAULT_CACHE_DEDUP_INTERVAL)));
+
+ setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
+ }
+ }
+
public boolean isTail() {
return tail;
}
@@ -257,6 +302,22 @@ public abstract class Input extends ConfigBlock implements Runnable {
return thread;
}
+ public LRUCache getCache() {
+ return cache;
+ }
+
+ public void setCache(LRUCache cache) {
+ this.cache = cache;
+ }
+
+ public String getCacheKeyField() {
+ return cacheKeyField;
+ }
+
+ public void setCacheKeyField(String cacheKeyField) {
+ this.cacheKeyField = cacheKeyField;
+ }
+
@Override
public String getNameForThread() {
if (filePath != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
new file mode 100644
index 0000000..d9cfef8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.cache;
+
+import com.google.common.collect.EvictingQueue;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * LRU cache for handle de-duplications per input files.
+ * It won't put already existing entries into the cache map if de-duplication interval not higher then a specific value
+ * or if the new value is the most recently used one (in case of lastDedupEnabled is true)
+ */
+public class LRUCache {
+ private final LinkedHashMap<String, Long> keyValueMap;
+ private final String fileName;
+ private final long dedupInterval;
+ private final boolean lastDedupEnabled;
+ private final EvictingQueue<String> mostRecentLogs;
+
+ public LRUCache(final int limit, final String fileName, final long dedupInterval, boolean lastDedupEnabled) {
+ this.fileName = fileName;
+ this.dedupInterval = dedupInterval;
+ this.lastDedupEnabled = lastDedupEnabled;
+ this.mostRecentLogs = EvictingQueue.create(1); // for now, we will just store 1 mru entry
+ keyValueMap = new LinkedHashMap<String, Long>(16, 0.75f, true) {
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<String, Long> eldest) {
+ return size() > limit;
+ }
+ };
+ }
+
+ public boolean isEntryReplaceable(String key, Long value) {
+ boolean result = true;
+ Long existingValue = keyValueMap.get(key);
+ if (existingValue == null) {
+ result = true;
+ } else if (lastDedupEnabled && mostRecentLogs.contains(key)) { // TODO: get peek element if mostRecentLogs will contain more than 1 element
+ result = false;
+ } else if (Math.abs(value - existingValue) < dedupInterval) {
+ result = false;
+ }
+ mostRecentLogs.add(key);
+ return result;
+ }
+
+ public void put(String key, Long value) {
+ if (isEntryReplaceable(key, value)) {
+ keyValueMap.put(key, value);
+ }
+ }
+
+ public Long get(String key) {
+ mostRecentLogs.add(key);
+ return keyValueMap.get(key);
+ }
+
+ public String getMRUKey() {
+ return mostRecentLogs.peek();
+ }
+
+ public int size() {
+ return keyValueMap.size();
+ }
+
+ public long getDedupInterval() {
+ return dedupInterval;
+ }
+
+ public boolean containsKey(String key) {
+ return keyValueMap.containsKey(key);
+ }
+
+ public String getFileName() {
+ return this.fileName;
+ }
+
+ public boolean isLastDedupEnabled() {
+ return lastDedupEnabled;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index eb3ae01..6a7fad7 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -24,6 +24,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.Map;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
@@ -80,6 +81,7 @@ public class MapperDate extends Mapper {
if (isEpoch) {
long ms = Long.parseLong(value.toString()) * 1000;
value = new Date(ms);
+ jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, ((Date) value).getTime());
} else if (targetDateFormatter != null) {
if (srcDateFormatter != null) {
Date srcDate = srcDateFormatter.parse(value.toString());
@@ -97,8 +99,10 @@ public class MapperDate extends Mapper {
}
}
value = targetDateFormatter.format(srcDate);
+ jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, srcDate.getTime());
} else {
value = targetDateFormatter.parse(value.toString());
+ jsonObj.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, ((Date) value).getTime());
}
} else {
return value;
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
new file mode 100644
index 0000000..fcf2695
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+/**
+ * Filter for outputs based on input configs, which can drop lines if the filter applies.
+ */
+public class OutputLineFilter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(OutputLineFilter.class);
+
+ /**
+ * Applies filter based on input cache (on service log only).
+ * Get the message and in-memory timestamp for log line. If both are not empty, evaluate that log line needs to be filtered out or not.
+ */
+ public Boolean apply(Map<String, Object> lineMap, Input input) {
+ boolean isLogFilteredOut = false;
+ LRUCache inputLruCache = input.getCache();
+ if (inputLruCache != null && "service".equals(input.getConfigs().get(LogFeederConstants.ROW_TYPE))) {
+ String logMessage = (String) lineMap.get(input.getCacheKeyField());
+ Long timestamp = null;
+ if (lineMap.containsKey((LogFeederConstants.IN_MEMORY_TIMESTAMP))) {
+ timestamp = (Long) lineMap.get(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+ }
+ if (logMessage != null && timestamp != null) {
+ isLogFilteredOut = !inputLruCache.isEntryReplaceable(logMessage, timestamp);
+ if (!isLogFilteredOut) {
+ inputLruCache.put(logMessage, timestamp);
+ } else {
+ LOG.debug("Log line filtered out: {} (file: {}, dedupInterval: {}, lastDedupEnabled: {})",
+ logMessage, inputLruCache.getFileName(), inputLruCache.getDedupInterval(), inputLruCache.isLastDedupEnabled());
+ }
+ }
+ }
+ if (lineMap.containsKey(LogFeederConstants.IN_MEMORY_TIMESTAMP)) {
+ lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+ }
+ return isLogFilteredOut;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
index 86b5c57..3c80e50 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -30,8 +30,10 @@ import java.util.UUID;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
import org.apache.ambari.logfeeder.logconfig.FilterLogData;
import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.MurmurHash;
import org.apache.commons.lang3.StringUtils;
@@ -51,6 +53,8 @@ public class OutputManager {
private static long docCounter = 0;
private MetricData messageTruncateMetric = new MetricData(null, false);
+ private OutputLineFilter outputLineFilter = new OutputLineFilter();
+
public List<Output> getOutputs() {
return outputs;
}
@@ -138,8 +142,8 @@ public class OutputManager {
jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077));
}
}
-
- if (FilterLogData.INSTANCE.isAllowed(jsonObj, inputMarker)) {
+ if (FilterLogData.INSTANCE.isAllowed(jsonObj, inputMarker)
+ && !outputLineFilter.apply(jsonObj, inputMarker.input)) {
for (Output output : input.getOutputList()) {
try {
output.write(jsonObj, inputMarker);
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index 06d8db2..643dafc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.output.OutputManager;
@@ -76,6 +77,7 @@ public class FilterJSONTest {
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+ assertEquals("Incorrect decoding: in memory timestamp", d.getTime(), jsonParams.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
}
@@ -100,6 +102,7 @@ public class FilterJSONTest {
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+ assertEquals("Incorrect decoding: in memory timestamp", d.getTime(), jsonParams.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
new file mode 100644
index 0000000..dd97d27
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.cache;
+
+import org.joda.time.DateTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+public class LRUCacheTest {
+
+ private LRUCache underTest;
+
+ @Before
+ public void setUp() {
+ underTest = new LRUCache(4, "/mypath", Long.parseLong("1000"), true);
+ }
+
+ @Test
+ public void testLruCachePut() {
+ // GIVEN
+ // WHEN
+ underTest.put("mymessage1", 1000L);
+ underTest.put("mymessage2", 1000L);
+ underTest.put("mymessage3", 1000L);
+ underTest.put("mymessage4", 1000L);
+ underTest.put("mymessage5", 1000L);
+ underTest.put("mymessage1", 1500L);
+ underTest.put("mymessage1", 3500L);
+ underTest.put("mymessage5", 1700L);
+ // THEN
+ assertEquals((Long) 1500L, underTest.get("mymessage1"));
+ assertEquals((Long) 1000L, underTest.get("mymessage5"));
+ assertEquals(underTest.getMRUKey(), "mymessage5");
+ assertEquals(4, underTest.size());
+ assertFalse(underTest.containsKey("mymessage2"));
+ }
+
+ @Test
+ public void testLruCacheFilterMruKeys() {
+ // GIVEN
+ // WHEN
+ underTest.put("mymessage1", 1000L);
+ underTest.put("mymessage1", 3000L);
+ underTest.put("mymessage1", 5000L);
+ underTest.put("mymessage1", 7000L);
+ // THEN
+ assertEquals((Long) 1000L, underTest.get("mymessage1"));
+ }
+
+ @Test
+ public void testLruCacheDoNotFilterMruKeysIfLastDedupDisabled() {
+ // GIVEN
+ underTest = new LRUCache(4, "/mypath", 1000, false);
+ // WHEN
+ underTest.put("mymessage1", 1000L);
+ underTest.put("mymessage1", 3000L);
+ // THEN
+ assertEquals((Long) 3000L, underTest.get("mymessage1"));
+ }
+
+ @Test
+ public void testLruCacheFilterByDedupInterval() {
+ // GIVEN
+ // WHEN
+ underTest.put("mymessage1", 1000L);
+ underTest.put("mymessage2", 1000L);
+ underTest.put("mymessage1", 1250L);
+ underTest.put("mymessage2", 1500L);
+ underTest.put("mymessage1", 1500L);
+ underTest.put("mymessage2", 2100L);
+ // THEN
+ assertEquals((Long) 1000L, underTest.get("mymessage1"));
+ assertEquals((Long) 2100L, underTest.get("mymessage2"));
+ }
+
+ @Test
+ public void testLruCacheWithDates() {
+ // GIVEN
+ DateTime firstDate = DateTime.now();
+ DateTime secondDate = firstDate.plusMillis(500);
+ // WHEN
+ underTest.put("mymessage1", firstDate.toDate().getTime());
+ underTest.put("mymessage2", firstDate.toDate().getTime());
+ underTest.put("mymessage1", secondDate.toDate().getTime());
+ // THEN
+ assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage1"));
+ assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage2"));
+ }
+
+ @Test
+ public void testLruCacheWithDatesReachDedupInterval() {
+ // GIVEN
+ DateTime firstDate = DateTime.now();
+ DateTime secondDate = firstDate.plusMillis(1500);
+ // WHEN
+ underTest.put("mymessage1", firstDate.toDate().getTime());
+ underTest.put("mymessage2", firstDate.toDate().getTime());
+ underTest.put("mymessage1", secondDate.toDate().getTime());
+ // THEN
+ assertEquals((Long) secondDate.toDate().getTime(), underTest.get("mymessage1"));
+ assertEquals((Long) firstDate.toDate().getTime(), underTest.get("mymessage2"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
index 08680f6..8beecda 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -24,6 +24,7 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.log4j.Logger;
import org.junit.Test;
@@ -52,6 +53,7 @@ public class MapperDateTest {
assertEquals("Value wasn't matched properly", d, mappedValue);
assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+ assertEquals("Value wasn't put into jsonObj", d.getTime(), jsonObj.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
assertTrue("jsonObj is not empty", jsonObj.isEmpty());
}
@@ -73,6 +75,7 @@ public class MapperDateTest {
assertEquals("Value wasn't matched properly", d, mappedValue);
assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+ assertEquals("Value wasn't put into jsonObj", d.getTime(), jsonObj.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP));
assertTrue("jsonObj is not empty", jsonObj.isEmpty());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
new file mode 100644
index 0000000..1ccc319
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.output;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class OutputLineFilterTest {
+
+ private static final String CACHE_KEY_FIELD = "log_message";
+ private static final String DEFAULT_DUMMY_MESSAGE = "myMessage";
+
+ private OutputLineFilter underTest;
+ private Input inputMock;
+
+ @Before
+ public void setUp() {
+ underTest = new OutputLineFilter();
+ inputMock = EasyMock.mock(Input.class);
+ }
+
+ @Test
+ public void testApplyWithFilterOutByDedupInterval() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, false));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ // WHEN
+ boolean result = underTest.apply(generateLineMap(), inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testApplyDoNotFilterOutDataByDedupInterval() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 10L, false));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ // WHEN
+ boolean result = underTest.apply(generateLineMap(), inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testApplyWithFilterOutByDedupLast() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 10L, true));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ // WHEN
+ boolean result = underTest.apply(generateLineMap(), inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertTrue(result);
+ }
+
+ @Test
+ public void testApplyDoNotFilterOutDataByDedupLast() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache("myMessage2", 10L, true));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ // WHEN
+ boolean result = underTest.apply(generateLineMap(), inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testApplyWithoutLruCache() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(null);
+ EasyMock.replay(inputMock);
+ // WHEN
+ boolean result = underTest.apply(generateLineMap(), inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testApplyWithoutInMemoryTimestamp() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, true));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ Map<String, Object> lineMap = generateLineMap();
+ lineMap.remove(LogFeederConstants.IN_MEMORY_TIMESTAMP);
+ // WHEN
+ boolean result = underTest.apply(lineMap, inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertFalse(result);
+ }
+
+ @Test
+ public void testApplyWithoutLogMessage() {
+ // GIVEN
+ EasyMock.expect(inputMock.getCache()).andReturn(createLruCache(DEFAULT_DUMMY_MESSAGE, 100L, true));
+ EasyMock.expect(inputMock.getConfigs()).andReturn(generateInputConfigs());
+ EasyMock.expect(inputMock.getCacheKeyField()).andReturn(CACHE_KEY_FIELD);
+ EasyMock.replay(inputMock);
+ Map<String, Object> lineMap = generateLineMap();
+ lineMap.remove(CACHE_KEY_FIELD);
+ // WHEN
+ boolean result = underTest.apply(lineMap, inputMock);
+ // THEN
+ EasyMock.verify(inputMock);
+ assertFalse(result);
+ }
+
+ private Map<String, Object> generateLineMap() {
+ Map<String, Object> lineMap = new HashMap<>();
+ lineMap.put(CACHE_KEY_FIELD, "myMessage");
+ lineMap.put(LogFeederConstants.IN_MEMORY_TIMESTAMP, 150L);
+ return lineMap;
+ }
+
+ private Map<String, Object> generateInputConfigs() {
+ Map<String, Object> inputConfigs = new HashMap<>();
+ inputConfigs.put(LogFeederConstants.ROW_TYPE, "service");
+ return inputConfigs;
+ }
+
+ private LRUCache createLruCache(String defaultKey, long defaultValue, boolean lastDedupEanabled) {
+ LRUCache lruCache = new LRUCache(4, "myfilepath", 100, lastDedupEanabled);
+ lruCache.put(defaultKey, defaultValue);
+ return lruCache;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
index a080fa8..0a0a195 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -103,8 +103,9 @@ public class OutputManagerTest {
expect(mockInput.isUseEventMD5()).andReturn(false);
expect(mockInput.isGenEventMD5()).andReturn(false);
expect(mockInput.getConfigs()).andReturn(Collections.<String, Object> emptyMap());
+ expect(mockInput.getCache()).andReturn(null);
expect(mockInput.getOutputList()).andReturn(Arrays.asList(output1, output2, output3));
-
+
output1.write(jsonObj, inputMarker); expectLastCall();
output2.write(jsonObj, inputMarker); expectLastCall();
output3.write(jsonObj, inputMarker); expectLastCall();
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
index 879b786..068bc3a 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
+++ b/ambari-logsearch/docker/test-config/logfeeder/logfeeder.properties
@@ -28,3 +28,8 @@ logfeeder.log.filter.enable=true
logfeeder.solr.config.interval=5
logfeeder.solr.core.config.name=history
logfeeder.solr.zk_connect_string=localhost:9983
+logfeeder.cache.enabled=true
+logfeeder.cache.size=100
+logfeeder.cache.key.field=log_message
+logfeeder.cache.dedup.interval=1000
+logfeeder.cache.last.dedup.enabled=true
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
----------------------------------------------------------------------
diff --git a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
index 122a9e1..d3685a4 100644
--- a/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
+++ b/ambari-logsearch/docker/test-config/logfeeder/shipper-conf/input.config-zookeeper.json
@@ -3,7 +3,10 @@
{
"type": "zookeeper",
"rowtype": "service",
- "path": "/root/test-logs/zookeeper/zookeeper-test-log.txt"
+ "path": "/root/test-logs/zookeeper/zookeeper-test-log.txt",
+ "cache_enabled" : "true",
+ "cache_size" : "10",
+ "cache_dedup_interval" : "1000"
}
],
"filter": [
http://git-wip-us.apache.org/repos/asf/ambari/blob/9bc97c4b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
index 2acb57e..ca14c1f 100644
--- a/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
+++ b/ambari-server/src/main/resources/common-services/LOGSEARCH/0.5.0/configuration/logfeeder-properties.xml
@@ -64,4 +64,60 @@
</value-attributes>
<on-ambari-upgrade add="true"/>
</property>
+ <property>
+ <name>logfeeder.cache.enabled</name>
+ <value>false</value>
+ <description>
+ Enable input cache for every monitored input file. The cache stores log lines, based on the data, duplications can be dropped.
+ </description>
+ <display-name>Input cache enabled</display-name>
+ <value-attributes>
+ <type>boolean</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>logfeeder.cache.size</name>
+ <value>100</value>
+ <description>Size of the input caches</description>
+ <display-name>Input cache size</display-name>
+ <value-attributes>
+ <type>int</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>logfeeder.cache.dedup.interval</name>
+ <value>1000</value>
+ <description>
+ If input cache is enabled, Log Feeder can drop any duplicated line during log processing,
+ but only if the duplicated lines/messages are in the same interval (in milliseconds) with the original message/line.
+ </description>
+ <display-name>Input cache dedup interval</display-name>
+ <value-attributes>
+ <type>int</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>logfeeder.cache.last.dedup.enabled</name>
+ <value>false</value>
+ <description>
+ If last dedup is enabled for input cache, Log Feeder will drop every new line (message), which is the same as the last line.
+ </description>
+ <display-name>Input cache last dedup</display-name>
+ <value-attributes>
+ <type>boolean</type>
+ </value-attributes>
+ <on-ambari-upgrade add="true"/>
+ </property>
+ <property>
+ <name>logfeeder.cache.key.field</name>
+ <value>log_message</value>
+ <description>
+ Key field, which will be used as keys in the Input cache. (by defalt, log_message represets the message part of processed data)<
+ /description>
+ <display-name>Input cache key field</display-name>
+ <on-ambari-upgrade add="true"/>
+ </property>
</configuration>