You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2018/01/20 09:03:42 UTC
[ambari] branch trunk updated: AMBARI-22818. Log Feeder: refactor -
create plugin api
This is an automated email from the ASF dual-hosted git repository.
oleewere pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push:
new a3f26e5 AMBARI-22818. Log Feeder: refactor - create plugin api
a3f26e5 is described below
commit a3f26e5e35c10c834312430cd0626a38406e7459
Author: Oliver Szabo <ol...@gmail.com>
AuthorDate: Wed Jan 10 20:46:21 2018 +0100
AMBARI-22818. Log Feeder: refactor - create plugin api
---
.../api/model/inputconfig/InputDescriptor.java | 2 -
.../inputconfig/impl/InputDescriptorImpl.java | 9 -
.../ambari-logsearch-logfeeder-plugin-api/pom.xml | 76 +++++
.../ambari/logfeeder/plugin/common}/AliasUtil.java | 51 +--
.../ambari/logfeeder/plugin/common/ConfigItem.java | 191 +++++++++++
.../plugin/common/LogFeederProperties.java} | 26 +-
.../logfeeder/plugin/common}/MetricData.java | 17 +-
.../ambari/logfeeder/plugin}/filter/Filter.java | 152 +++++----
.../logfeeder/plugin/filter}/mapper/Mapper.java | 41 ++-
.../ambari/logfeeder/plugin}/input/Input.java | 353 ++++++++++-----------
.../logfeeder/plugin/input/InputMarker.java} | 33 +-
.../logfeeder/plugin}/input/cache/LRUCache.java | 2 +-
.../logfeeder/plugin/manager/BlockManager.java} | 35 +-
.../logfeeder/plugin/manager/InputManager.java} | 36 ++-
.../logfeeder/plugin/manager/OutputManager.java} | 37 +--
.../ambari/logfeeder/plugin}/output/Output.java | 123 ++++---
.../ambari-logsearch-logfeeder/pom.xml | 10 +
.../ambari/logfeeder/common/ConfigBlock.java | 153 ---------
.../ambari/logfeeder/common/ConfigHandler.java | 111 ++++---
.../apache/ambari/logfeeder/common/ConfigItem.java | 103 ------
.../logfeeder/common/LogEntryParseTester.java | 51 ++-
.../ambari/logfeeder/conf/ApplicationConfig.java | 13 +-
.../ambari/logfeeder/conf/LogEntryCacheConfig.java | 4 +-
.../ambari/logfeeder/conf/LogFeederProps.java | 3 +-
.../logfeeder/conf/LogFeederSecurityConfig.java | 2 +
.../apache/ambari/logfeeder/filter/FilterGrok.java | 50 ++-
.../apache/ambari/logfeeder/filter/FilterJSON.java | 24 +-
.../ambari/logfeeder/filter/FilterKeyValue.java | 38 ++-
.../ambari/logfeeder/input/AbstractInputFile.java | 329 -------------------
.../logfeeder/input/InputConfigUploader.java | 26 +-
.../apache/ambari/logfeeder/input/InputFile.java | 277 ++++++++++++++--
.../{InputMarker.java => InputFileMarker.java} | 46 ++-
.../{InputManager.java => InputManagerImpl.java} | 76 +++--
.../apache/ambari/logfeeder/input/InputS3File.java | 61 ++--
.../ambari/logfeeder/input/InputSimulate.java | 81 ++---
.../logfeeder/input/file/FileCheckInHelper.java | 93 ++++++
.../logfeeder/input/file/ProcessFileHelper.java | 143 +++++++++
.../input/file/ResumeLineNumberHelper.java | 91 ++++++
.../ambari/logfeeder/input/reader/GZIPReader.java | 4 +-
.../input/reader/LogsearchReaderFactory.java | 4 +-
.../loglevelfilter/LogLevelFilterHandler.java | 30 +-
.../ambari/logfeeder/mapper/MapperAnonymize.java | 11 +-
.../apache/ambari/logfeeder/mapper/MapperDate.java | 32 +-
.../ambari/logfeeder/mapper/MapperFieldCopy.java | 8 +-
.../ambari/logfeeder/mapper/MapperFieldName.java | 10 +-
.../ambari/logfeeder/mapper/MapperFieldValue.java | 10 +-
.../ambari/logfeeder/metrics/MetricsManager.java | 1 +
.../ambari/logfeeder/metrics/StatsLogger.java | 1 +
.../apache/ambari/logfeeder/output/OutputData.java | 4 +-
.../ambari/logfeeder/output/OutputDevNull.java | 48 ++-
.../apache/ambari/logfeeder/output/OutputFile.java | 55 +++-
.../ambari/logfeeder/output/OutputHDFSFile.java | 41 ++-
.../ambari/logfeeder/output/OutputKafka.java | 52 ++-
.../ambari/logfeeder/output/OutputLineFilter.java | 4 +-
.../{OutputManager.java => OutputManagerImpl.java} | 69 ++--
.../ambari/logfeeder/output/OutputS3File.java | 34 +-
.../apache/ambari/logfeeder/output/OutputSolr.java | 58 ++--
.../logfeeder/output/S3OutputConfiguration.java | 12 +-
.../apache/ambari/logfeeder/output/S3Uploader.java | 1 -
.../ambari/logfeeder/output/spool/LogSpooler.java | 13 +-
.../ambari/logfeeder/util/LogFeederUtil.java | 37 +--
.../ambari/logfeeder/filter/FilterGrokTest.java | 26 +-
.../ambari/logfeeder/filter/FilterJSONTest.java | 18 +-
.../logfeeder/filter/FilterKeyValueTest.java | 20 +-
.../ambari/logfeeder/input/InputFileTest.java | 46 +--
.../ambari/logfeeder/input/InputManagerTest.java | 25 +-
.../ambari/logfeeder/input/cache/LRUCacheTest.java | 1 +
.../logfeeder/metrics/MetricsManagerTest.java | 2 +
.../ambari/logfeeder/output/OutputKafkaTest.java | 6 +-
.../logfeeder/output/OutputLineFilterTest.java | 4 +-
.../ambari/logfeeder/output/OutputManagerTest.java | 35 +-
.../ambari/logfeeder/output/OutputS3FileTest.java | 105 ------
.../ambari/logfeeder/output/OutputSolrTest.java | 183 -----------
ambari-logsearch/pom.xml | 1 +
74 files changed, 2044 insertions(+), 1936 deletions(-)
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
index 2ad1fac..82e9504 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
@@ -49,6 +49,4 @@ public interface InputDescriptor {
Long getCacheDedupInterval();
Boolean isEnabled();
-
- Map<String, Object> getAllProperties();
}
diff --git a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
index 765bf83..2e3405d 100644
--- a/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
+++ b/ambari-logsearch/ambari-logsearch-config-zookeeper/src/main/java/org/apache/ambari/logsearch/config/zookeeper/model/inputconfig/impl/InputDescriptorImpl.java
@@ -292,13 +292,4 @@ public abstract class InputDescriptorImpl implements InputDescriptor {
public void setIsEnabled(Boolean isEnabled) {
this.isEnabled = isEnabled;
}
-
- @Override
- public Map<String, Object> getAllProperties() {
- return allProperties;
- }
-
- public void setAllProperties(Map<String, Object> allProperties) {
- this.allProperties = allProperties;
- }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml
new file mode 100644
index 0000000..7510002
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/pom.xml
@@ -0,0 +1,76 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-logsearch</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-logsearch-logfeeder-plugin-api</artifactId>
+ <packaging>jar</packaging>
+ <name>Ambari Logsearch Log Feeder Plugin Api</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-logsearch-config-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.20</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.13</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>18.0</version>
+ </dependency>
+ </dependencies>
+
+
+</project>
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
similarity index 70%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
index 3c48aa2..521e0bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/AliasUtil.java
@@ -16,26 +16,30 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.util;
+package org.apache.ambari.logfeeder.plugin.common;
-import java.util.HashMap;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
public class AliasUtil {
- private static final Logger LOG = Logger.getLogger(AliasUtil.class);
+ private static final Logger LOG = LoggerFactory.getLogger(AliasUtil.class);
private static final String ALIAS_CONFIG_JSON = "alias_config.json";
private static HashMap<String, Object> aliasMap = null;
static {
- aliasMap = FileUtil.getJsonFileContentFromClassPath(ALIAS_CONFIG_JSON);
+ aliasMap = getJsonFileContentFromClassPath(ALIAS_CONFIG_JSON);
}
public static enum AliasType {
@@ -48,10 +52,10 @@ public class AliasUtil {
public static Object getClassInstance(String key, AliasType aliasType) {
String classFullName = getClassFullName(key, aliasType);
-
+
Object instance = null;
try {
- instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+ instance = Class.forName(classFullName).getConstructor().newInstance();
} catch (Exception exception) {
LOG.error("Unsupported class = " + classFullName, exception.getCause());
}
@@ -84,23 +88,23 @@ public class AliasUtil {
private static String getClassFullName(String key, AliasType aliastype) {
String className = null;// key as a default value;
-
+
HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
String value = aliasInfo.get("klass");
- if (!StringUtils.isEmpty(value)) {
+ if (value != null && !value.isEmpty()) {
className = value;
LOG.debug("Class name found for key :" + key + ", class name :" + className + " aliastype:" + aliastype.name());
} else {
LOG.debug("Class name not found for key :" + key + " aliastype:" + aliastype.name());
}
-
+
return className;
}
@SuppressWarnings("unchecked")
private static HashMap<String, String> getAliasInfo(String key, AliasType aliastype) {
- HashMap<String, String> aliasInfo = new HashMap<String, String>();
-
+ HashMap<String, String> aliasInfo = new HashMap<>();
+
if (aliasMap != null) {
String typeKey = aliastype.name().toLowerCase();
HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
@@ -108,7 +112,18 @@ public class AliasUtil {
aliasInfo = (HashMap<String, String>) typeJson.get(key);
}
}
-
+
return aliasInfo;
}
+
+ public static HashMap<String, Object> getJsonFileContentFromClassPath(String fileName) {
+ ObjectMapper mapper = new ObjectMapper();
+ try (InputStream inputStream = AliasUtil.class.getClassLoader().getResourceAsStream(fileName)) {
+ return mapper.readValue(inputStream, new TypeReference<HashMap<String, Object>>() {});
+ } catch (IOException e) {
+ LOG.error("Error occurred during loading alias json file: {}", e);
+ }
+ return new HashMap<String, Object>();
+ }
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
new file mode 100644
index 0000000..f9d4a7a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/ConfigItem.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.plugin.common;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Type;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public abstract class ConfigItem<PROP_TYPE extends LogFeederProperties> implements Cloneable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConfigItem.class);
+
+ private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
+
+ public static Gson getGson() {
+ return gson;
+ }
+
+ private Map<String, Object> configs;
+ private Map<String, String> contextFields = new HashMap<>();
+ private boolean drain = false;
+ public MetricData statMetric = new MetricData(getStatMetricName(), false);
+
+ public abstract void init(PROP_TYPE logFeederProperties) throws Exception;
+
+ /**
+ * Used while logging. Keep it short and meaningful
+ */
+ public abstract String getShortDescription();
+
+ public abstract String getStatMetricName();
+
+ public abstract boolean logConfigs();
+
+ public void loadConfig(Map<String, Object> map) {
+ configs = cloneObject(map);
+
+ Map<String, String> nvList = getNVList("add_fields");
+ if (nvList != null) {
+ contextFields.putAll(nvList);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Map<String, String> getNVList(String key) {
+ return (Map<String, String>) configs.get(key);
+ }
+
+ public Map<String, Object> getConfigs() {
+ return configs;
+ }
+
+ public boolean isEnabled() {
+ return getBooleanValue("is_enabled", true);
+ }
+
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ metricsList.add(statMetric);
+ }
+
+ public void incrementStat(int count) {
+ statMetric.value += count;
+ }
+
+ public synchronized void logStat() {
+ logStatForMetric(statMetric, "Stat");
+ }
+
+ public void logStatForMetric(MetricData metric, String prefixStr) {
+ long currStat = metric.value;
+ long currMS = System.currentTimeMillis();
+ String postFix = ", key=" + getShortDescription();
+ if (currStat > metric.prevLogValue) {
+ LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
+ " secs, count=" + (currStat - metric.prevLogValue) + postFix);
+ }
+ metric.prevLogValue = currStat;
+ metric.prevLogTime = currMS;
+ }
+
+ public boolean isDrain() {
+ return drain;
+ }
+
+ public void setDrain(boolean drain) {
+ this.drain = drain;
+ }
+
+ public String getStringValue(String property) {
+ return getStringValue(property, null);
+ }
+
+ public String getStringValue(String property, String defaultValue) {
+ Object strValue = configs.getOrDefault(property, defaultValue);
+ if (strValue != null) {
+ return strValue.toString();
+ }
+ return null;
+ }
+
+ public Boolean getBooleanValue(String property) {
+ return getBooleanValue(property, false);
+ }
+
+ public Boolean getBooleanValue(String property, Boolean defaultValue) {
+ Object booleanValue = configs.getOrDefault(property, defaultValue);
+ if (booleanValue != null) {
+ if (booleanValue.getClass().isAssignableFrom(Boolean.class)) {
+ return (Boolean) booleanValue;
+ } else {
+ String strValue = booleanValue.toString();
+ return strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes");
+ }
+ }
+ return false;
+ }
+
+ public Long getLongValue(String property) {
+ return getLongValue(property, null);
+ }
+
+ public Long getLongValue(String property, Long defaultValue) {
+ Object longValue = configs.getOrDefault(property, defaultValue);
+ if (longValue != null) {
+ if (longValue.getClass().isAssignableFrom(Long.class)) {
+ return (Long) longValue;
+ } else {
+ return Long.parseLong(longValue.toString());
+ }
+ }
+ return null;
+ }
+
+ public Integer getIntValue(String property) {
+ return getIntValue(property, null);
+ }
+
+ public Integer getIntValue(String property, Integer defaultValue) {
+ Object intValue = configs.getOrDefault(property, defaultValue);
+ if (intValue != null) {
+ if (intValue.getClass().isAssignableFrom(Integer.class)) {
+ return (Integer) intValue;
+ } else {
+ return Integer.parseInt(intValue.toString());
+ }
+ }
+ return null;
+ }
+
+ private Map<String, Object> cloneObject(Map<String, Object> map) {
+ if (map == null) {
+ return null;
+ }
+ String jsonStr = gson.toJson(map);
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
+ return gson.fromJson(jsonStr, type);
+ }
+
+ private Object getValue(String property) {
+ return configs.get(property);
+ }
+
+ private Object getValue(String property, Object defaultValue) {
+ return configs.getOrDefault(property, defaultValue);
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
similarity index 58%
copy from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
index 6767687..a1e5c12 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/LogFeederProperties.java
@@ -16,25 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.plugin.common;
-package org.apache.ambari.logfeeder.input;
+import java.util.Properties;
/**
- * This file contains the file inode, line number of the log currently been read
+ * Static application level configuration interface for Log Feeder
*/
-public class InputMarker {
- public final Input input;
- public final String base64FileKey;
- public final int lineNumber;
-
- public InputMarker(Input input, String base64FileKey, int lineNumber) {
- this.input = input;
- this.base64FileKey = base64FileKey;
- this.lineNumber = lineNumber;
- }
-
- @Override
- public String toString() {
- return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
- }
+public interface LogFeederProperties {
+
+ /**
+ * Get all key-value pairs from static application level Log Feeder configuration
+ */
+ Properties getProperties();
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
similarity index 81%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
index e7f5d37..933f131 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/common/MetricData.java
@@ -16,11 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.ambari.logfeeder.metrics;
-
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+package org.apache.ambari.logfeeder.plugin.common;
public class MetricData {
public final String metricsName;
@@ -30,17 +26,12 @@ public class MetricData {
this.metricsName = metricsName;
this.isPointInTime = isPointInTime;
}
-
+
public long value = 0;
public long prevPublishValue = 0;
-
+
public long prevLogValue = 0;
public long prevLogTime = System.currentTimeMillis();
-
+
public int publishCount = 0; // Number of times the metric was published so far
-
- @Override
- public String toString() {
- return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
- }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
similarity index 68%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
index a06b348..9bf69b9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/Filter.java
@@ -16,53 +16,57 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.ambari.logfeeder.filter;
+package org.apache.ambari.logfeeder.plugin.filter;
+
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.PostMapValues;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.log4j.Priority;
+public abstract class Filter<PROP_TYPE extends LogFeederProperties> extends ConfigItem<PROP_TYPE> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Filter.class);
-public abstract class Filter extends ConfigItem {
- protected FilterDescriptor filterDescriptor;
- protected Input input;
+ private final Map<String, List<Mapper>> postFieldValueMappers = new HashMap<>();
+ private FilterDescriptor filterDescriptor;
+ private PROP_TYPE logFeederProperties;
private Filter nextFilter = null;
+ private Input input;
private OutputManager outputManager;
- private Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
-
- public void loadConfig(FilterDescriptor filterDescriptor) {
+ public void loadConfigs(FilterDescriptor filterDescriptor, PROP_TYPE logFeederProperties, OutputManager outputManager) {
this.filterDescriptor = filterDescriptor;
+ this.logFeederProperties = logFeederProperties;
+ this.outputManager = outputManager;
}
public FilterDescriptor getFilterDescriptor() {
return filterDescriptor;
}
- @Override
- public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
+ public PROP_TYPE getLogFeederProperties() {
+ return logFeederProperties;
+ }
+ @Override
+ public void init(PROP_TYPE logFeederProperties) throws Exception {
initializePostMapValues();
if (nextFilter != null) {
- nextFilter.init(logFeederProps);
+ nextFilter.init(logFeederProperties);
}
}
@@ -76,17 +80,13 @@ public abstract class Filter extends ConfigItem {
for (PostMapValues pmv : values) {
for (MapFieldDescriptor mapFieldDescriptor : pmv.getMappers()) {
String mapClassCode = mapFieldDescriptor.getJsonName();
- Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasType.MAPPER);
+ Mapper mapper = (Mapper) AliasUtil.getClassInstance(mapClassCode, AliasUtil.AliasType.MAPPER);
if (mapper == null) {
LOG.warn("Unknown mapper type: " + mapClassCode);
continue;
}
if (mapper.init(getInput().getShortDescription(), fieldName, mapClassCode, mapFieldDescriptor)) {
- List<Mapper> fieldMapList = postFieldValueMappers.get(fieldName);
- if (fieldMapList == null) {
- fieldMapList = new ArrayList<Mapper>();
- postFieldValueMappers.put(fieldName, fieldMapList);
- }
+ List<Mapper> fieldMapList = postFieldValueMappers.computeIfAbsent(fieldName, k -> new ArrayList<>());
fieldMapList.add(mapper);
}
}
@@ -94,30 +94,10 @@ public abstract class Filter extends ConfigItem {
}
}
- public void setOutputManager(OutputManager outputManager) {
- this.outputManager = outputManager;
- }
-
- public Filter getNextFilter() {
- return nextFilter;
- }
-
- public void setNextFilter(Filter nextFilter) {
- this.nextFilter = nextFilter;
- }
-
- public Input getInput() {
- return input;
- }
-
- public void setInput(Input input) {
- this.input = input;
- }
-
/**
* Deriving classes should implement this at the minimum
*/
- public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws Exception {
// TODO: There is no transformation for string types.
if (nextFilter != null) {
nextFilter.apply(inputStr, inputMarker);
@@ -126,7 +106,7 @@ public abstract class Filter extends ConfigItem {
}
}
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
for (String fieldName : postFieldValueMappers.keySet()) {
Object value = jsonObj.get(fieldName);
if (value != null) {
@@ -142,48 +122,66 @@ public abstract class Filter extends ConfigItem {
}
}
- public void close() {
- if (nextFilter != null) {
- nextFilter.close();
- }
+ public void loadConfig(FilterDescriptor filterDescriptor) {
+ this.filterDescriptor = filterDescriptor;
}
- public void flush() {
+ public Filter getNextFilter() {
+ return nextFilter;
+ }
+ public void setNextFilter(Filter nextFilter) {
+ this.nextFilter = nextFilter;
}
- @Override
- public void logStat() {
- super.logStat();
+ public Input getInput() {
+ return input;
+ }
+
+ public void setInput(Input input) {
+ this.input = input;
+ }
+
+ public OutputManager getOutputManager() {
+ return outputManager;
+ }
+
+ public void setOutputManager(OutputManager outputManager) {
+ this.outputManager = outputManager;
+ }
+
+ public void flush() {
+ // empty
+ }
+
+ public void close() {
if (nextFilter != null) {
- nextFilter.logStat();
+ nextFilter.close();
}
}
@Override
public boolean isEnabled() {
- return BooleanUtils.isNotFalse(filterDescriptor.isEnabled());
+ return filterDescriptor.isEnabled();
}
@Override
- public String getShortDescription() {
- return null;
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ super.addMetricsContainers(metricsList);
+ if (nextFilter != null) {
+ nextFilter.addMetricsContainers(metricsList);
+ }
}
@Override
- public boolean logConfigs(Priority level) {
- if (!super.logConfigs(level)) {
- return false;
- }
- LOG.log(level, "input=" + input.getShortDescription());
+ public boolean logConfigs() {
+ LOG.info("filter=" + getShortDescription());
return true;
}
@Override
- public void addMetricsContainers(List<MetricData> metricsList) {
- super.addMetricsContainers(metricsList);
- if (nextFilter != null) {
- nextFilter.addMetricsContainers(metricsList);
- }
+ public String getStatMetricName() {
+ // no metrics yet
+ return null;
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
similarity index 63%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
index 5facf76..d52bc01 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/filter/mapper/Mapper.java
@@ -16,28 +16,57 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.plugin.filter.mapper;
-package org.apache.ambari.logfeeder.mapper;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import java.util.Map;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
+public abstract class Mapper<PROP_TYPE extends LogFeederProperties> {
+
+ private MapFieldDescriptor mapFieldDescriptor;
+ private PROP_TYPE logFeederProperties;
-public abstract class Mapper {
private String inputDesc;
- protected String fieldName;
+ private String fieldName;
private String mapClassCode;
- public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
-
protected void init(String inputDesc, String fieldName, String mapClassCode) {
this.inputDesc = inputDesc;
this.fieldName = fieldName;
this.mapClassCode = mapClassCode;
}
+ public void loadConfigs(MapFieldDescriptor mapFieldDescriptor, PROP_TYPE logFeederProperties) {
+ this.mapFieldDescriptor = mapFieldDescriptor;
+ this.logFeederProperties = logFeederProperties;
+ }
+
+ public MapFieldDescriptor getMapFieldDescriptor() {
+ return mapFieldDescriptor;
+ }
+
+ public PROP_TYPE getLogFeederProperties() {
+ return logFeederProperties;
+ }
+
+ public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor);
+
public abstract Object apply(Map<String, Object> jsonObj, Object value);
+ public String getInputDesc() {
+ return inputDesc;
+ }
+
+ public String getFieldName() {
+ return fieldName;
+ }
+
+ public String getMapClassCode() {
+ return mapClassCode;
+ }
+
@Override
public String toString() {
return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
similarity index 53%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
index 7b9dcd4..e2158bc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/Input.java
@@ -16,84 +16,106 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.ambari.logfeeder.input;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
-import org.apache.ambari.logfeeder.common.ConfigItem;
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
+package org.apache.ambari.logfeeder.plugin.input;
+
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Conditions;
import org.apache.ambari.logsearch.config.api.model.inputconfig.Fields;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.log4j.Priority;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public abstract class Input extends ConfigItem implements Runnable {
-
- private static final boolean DEFAULT_TAIL = true;
- private static final boolean DEFAULT_USE_EVENT_MD5 = false;
- private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
- protected InputDescriptor inputDescriptor;
-
- protected InputManager inputManager;
- protected OutputManager outputManager;
- private List<Output> outputList = new ArrayList<>();
+public abstract class Input<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements Runnable {
- private Thread thread;
- private String type;
- protected String filePath;
- private Filter firstFilter;
- protected boolean isClosed;
-
- protected boolean tail;
- private boolean useEventMD5;
- private boolean genEventMD5;
+ private static final Logger LOG = LoggerFactory.getLogger(Input.class);
+ private InputDescriptor inputDescriptor;
+ private PROP_TYPE logFeederProperties;
+ private LogSearchConfigLogFeeder logSearchConfig;
+ private InputManager inputManager;
+ private OutputManager outputManager;
+ private final List<Output> outputList = new ArrayList<>();
+ private Filter<PROP_TYPE> firstFilter;
+ private boolean isClosed;
+ private String type;
+ private boolean useEventMD5 = false;
+ private boolean genEventMD5 = true;
+ private Thread thread;
private LRUCache cache;
private String cacheKeyField;
-
protected MetricData readBytesMetric = new MetricData(getReadBytesMetricName(), false);
- protected String getReadBytesMetricName() {
- return null;
- }
-
- public void loadConfig(InputDescriptor inputDescriptor) {
+
+ public void loadConfigs(InputDescriptor inputDescriptor, PROP_TYPE logFeederProperties,
+ InputManager inputManager, OutputManager outputManager) {
this.inputDescriptor = inputDescriptor;
+ this.logFeederProperties = logFeederProperties;
+ this.inputManager = inputManager;
+ this.outputManager = outputManager;
+ }
+
+ public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
+ this.logSearchConfig = logSearchConfig;
+ }
+
+ public LogSearchConfigLogFeeder getLogSearchConfig() {
+ return logSearchConfig;
+ }
+
+ public abstract boolean monitor();
+
+ public abstract List<? extends Input> getChildInputs();
+
+ public abstract INPUT_MARKER getInputMarker();
+
+ public abstract boolean isReady();
+
+ public abstract void setReady(boolean isReady);
+
+ public abstract void checkIn(INPUT_MARKER inputMarker);
+
+ public abstract void lastCheckIn();
+
+ public abstract String getReadBytesMetricName();
+
+ public PROP_TYPE getLogFeederProperties() {
+ return logFeederProperties;
}
public InputDescriptor getInputDescriptor() {
return inputDescriptor;
}
- public void setType(String type) {
- this.type = type;
+ public InputManager getInputManager() {
+ return inputManager;
}
- public void setInputManager(InputManager inputManager) {
- this.inputManager = inputManager;
+ public OutputManager getOutputManager() {
+ return outputManager;
}
public void setOutputManager(OutputManager outputManager) {
this.outputManager = outputManager;
}
- public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
- Conditions conditions = filterDescriptor.getConditions();
- Fields fields = conditions.getFields();
- return fields.getType().contains(inputDescriptor.getType());
+ public void setInputManager(InputManager inputManager) {
+ this.inputManager = inputManager;
+ }
+
+ public void addOutput(Output output) {
+ outputList.add(output);
}
public void addFilter(Filter filter) {
@@ -108,53 +130,40 @@ public abstract class Input extends ConfigItem implements Runnable {
}
}
+ public boolean isFilterRequired(FilterDescriptor filterDescriptor) {
+ Conditions conditions = filterDescriptor.getConditions();
+ Fields fields = conditions.getFields();
+ return fields.getType().contains(inputDescriptor.getType());
+ }
+
@SuppressWarnings("unchecked")
public boolean isOutputRequired(Output output) {
Map<String, Object> conditions = (Map<String, Object>) output.getConfigs().get("conditions");
if (conditions == null) {
return false;
}
-
+
Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
if (fields == null) {
return false;
}
-
+
List<String> types = (List<String>) fields.get("rowtype");
return types.contains(inputDescriptor.getRowtype());
}
- public void addOutput(Output output) {
- outputList.add(output);
+ @Override
+ public boolean isEnabled() {
+ return inputDescriptor.isEnabled();
}
@Override
- public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
- initCache(logFeederProps.getLogEntryCacheConfig());
- tail = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isTail(), DEFAULT_TAIL);
- useEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5);
- genEventMD5 = BooleanUtils.toBooleanDefaultIfNull(inputDescriptor.isGenEventMd5(), DEFAULT_GEN_EVENT_MD5);
-
+ public void init(PROP_TYPE logFeederProperties) throws Exception {
if (firstFilter != null) {
- firstFilter.init(logFeederProps);
- }
-
- }
-
- boolean monitor() {
- if (isReady()) {
- LOG.info("Starting thread. " + getShortDescription());
- thread = new Thread(this, getNameForThread());
- thread.start();
- return true;
- } else {
- return false;
+ firstFilter.init(logFeederProperties);
}
}
- public abstract boolean isReady();
-
@Override
public void run() {
try {
@@ -171,17 +180,17 @@ public abstract class Input extends ConfigItem implements Runnable {
* method should only exit after all data are read from the source or the
* process is exiting
*/
- abstract void start() throws Exception;
+ public abstract void start() throws Exception;
- public void outputLine(String line, InputMarker marker) {
+ public void outputLine(String line, INPUT_MARKER marker) {
statMetric.value++;
readBytesMetric.value += (line.length());
if (firstFilter != null) {
try {
firstFilter.apply(line, marker);
- } catch (LogFeederException e) {
- LOG.error(e.getLocalizedMessage(), e);
+ } catch (Exception e) {
+ LOG.error("Error during filter apply: {}", e);
}
} else {
// TODO: For now, let's make filter mandatory, so that no one accidently forgets to write filter
@@ -189,48 +198,8 @@ public abstract class Input extends ConfigItem implements Runnable {
}
}
- protected void flush() {
- if (firstFilter != null) {
- firstFilter.flush();
- }
- }
-
- @Override
- public void setDrain(boolean drain) {
- LOG.info("Request to drain. " + getShortDescription());
- super.setDrain(drain);
- try {
- thread.interrupt();
- } catch (Throwable t) {
- // ignore
- }
- }
-
- public void addMetricsContainers(List<MetricData> metricsList) {
- super.addMetricsContainers(metricsList);
- if (firstFilter != null) {
- firstFilter.addMetricsContainers(metricsList);
- }
- metricsList.add(readBytesMetric);
- }
-
- @Override
- public void logStat() {
- super.logStat();
- logStatForMetric(readBytesMetric, "Stat: Bytes Read");
-
- if (firstFilter != null) {
- firstFilter.logStat();
- }
- }
-
- public abstract void checkIn(InputMarker inputMarker);
-
- public abstract void lastCheckIn();
-
public void close() {
LOG.info("Close called. " + getShortDescription());
-
try {
if (firstFilter != null) {
firstFilter.close();
@@ -240,31 +209,34 @@ public abstract class Input extends ConfigItem implements Runnable {
}
}
- private void initCache(LogEntryCacheConfig cacheConfig) {
- boolean cacheEnabled = inputDescriptor.isCacheEnabled() != null
- ? inputDescriptor.isCacheEnabled()
- : cacheConfig.isCacheEnabled();
- if (cacheEnabled) {
- String cacheKeyField = inputDescriptor.getCacheKeyField() != null
- ? inputDescriptor.getCacheKeyField()
- : cacheConfig.getCacheKeyField();
+ public void flush() {
+ if (firstFilter != null) {
+ firstFilter.flush();
+ }
+ }
+
+ public void loadConfig(InputDescriptor inputDescriptor) {
+ this.inputDescriptor = inputDescriptor;
+ }
- setCacheKeyField(cacheKeyField);
+ public void setClosed(boolean isClosed) {
+ this.isClosed = isClosed;
+ }
- int cacheSize = inputDescriptor.getCacheSize() != null
- ? inputDescriptor.getCacheSize()
- : cacheConfig.getCacheSize();
+ public boolean isClosed() {
+ return isClosed;
+ }
- boolean cacheLastDedupEnabled = inputDescriptor.getCacheLastDedupEnabled() != null
- ? inputDescriptor.getCacheLastDedupEnabled()
- : cacheConfig.isCacheLastDedupEnabled();
+ public String getNameForThread() {
+ return this.getClass().getSimpleName();
+ }
- long cacheDedupInterval = inputDescriptor.getCacheDedupInterval() != null
- ? inputDescriptor.getCacheDedupInterval()
- : Long.parseLong(cacheConfig.getCacheDedupInterval());
+ public String getType() {
+ return type;
+ }
- setCache(new LRUCache(cacheSize, filePath, cacheDedupInterval, cacheLastDedupEnabled));
- }
+ public void setType(String type) {
+ this.type = type;
}
public boolean isUseEventMD5() {
@@ -276,78 +248,93 @@ public abstract class Input extends ConfigItem implements Runnable {
}
public Filter getFirstFilter() {
- return firstFilter;
+ return this.firstFilter;
}
- public String getFilePath() {
- return filePath;
+ public Thread getThread() {
+ return thread;
}
- public void setFilePath(String filePath) {
- this.filePath = filePath;
+ public void setThread(Thread thread) {
+ this.thread = thread;
}
- public void setClosed(boolean isClosed) {
- this.isClosed = isClosed;
+ public void setUseEventMD5(boolean useEventMD5) {
+ this.useEventMD5 = useEventMD5;
}
- public boolean isClosed() {
- return isClosed;
+ public void setGenEventMD5(boolean genEventMD5) {
+ this.genEventMD5 = genEventMD5;
}
- public List<Output> getOutputList() {
- return outputList;
- }
-
- public Thread getThread(){
- return thread;
+ public LRUCache getCache() {
+ return this.cache;
}
- public LRUCache getCache() {
- return cache;
+ public String getCacheKeyField() {
+ return this.cacheKeyField;
}
public void setCache(LRUCache cache) {
this.cache = cache;
}
- public String getCacheKeyField() {
- return cacheKeyField;
- }
-
public void setCacheKeyField(String cacheKeyField) {
this.cacheKeyField = cacheKeyField;
}
- @Override
- public boolean isEnabled() {
- return BooleanUtils.isNotFalse(inputDescriptor.isEnabled());
+ public List<? extends Output> getOutputList() {
+ return outputList;
}
- @Override
- public String getNameForThread() {
- if (filePath != null) {
- try {
- return (type + "=" + (new File(filePath)).getName());
- } catch (Throwable ex) {
- LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
- }
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ super.logStat();
+ logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+ if (firstFilter != null) {
+ firstFilter.logStat();
}
- return super.getNameForThread() + ":" + type;
}
- @Override
- public boolean logConfigs(Priority level) {
- if (!super.logConfigs(level)) {
- return false;
+ public void logStat() {
+ super.logStat();
+ logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+ if (firstFilter != null) {
+ firstFilter.logStat();
+ }
+ }
+
+ public void initCache(boolean cacheEnabled, String cacheKeyField, int cacheSize,
+ boolean cacheLastDedupEnabled, String cacheDedupInterval, String fileName) {
+ boolean enabled = getInputDescriptor().isCacheEnabled() != null
+ ? getInputDescriptor().isCacheEnabled()
+ : cacheEnabled;
+ if (enabled) {
+ String keyField = getInputDescriptor().getCacheKeyField() != null
+ ? getInputDescriptor().getCacheKeyField()
+ : cacheKeyField;
+
+ setCacheKeyField(keyField);
+
+ int size = getInputDescriptor().getCacheSize() != null
+ ? getInputDescriptor().getCacheSize()
+ : cacheSize;
+
+ boolean lastDedupEnabled = getInputDescriptor().getCacheLastDedupEnabled() != null
+ ? getInputDescriptor().getCacheLastDedupEnabled()
+ : cacheLastDedupEnabled;
+
+ long dedupInterval = getInputDescriptor().getCacheDedupInterval() != null
+ ? getInputDescriptor().getCacheDedupInterval()
+ : Long.parseLong(cacheDedupInterval);
+
+ setCache(new LRUCache(size, fileName, dedupInterval, lastDedupEnabled));
}
- LOG.log(level, "Printing Input=" + getShortDescription());
- LOG.log(level, "description=" + inputDescriptor.getPath());
- return true;
}
@Override
public String toString() {
return getShortDescription();
}
-}
\ No newline at end of file
+}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
similarity index 64%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
index 2ad1fac..aa54019 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/InputMarker.java
@@ -16,39 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+package org.apache.ambari.logfeeder.plugin.input;
import java.util.Map;
-public interface InputDescriptor {
- String getType();
-
- String getRowtype();
-
- String getPath();
-
- Map<String, String> getAddFields();
-
- String getSource();
-
- Boolean isTail();
-
- Boolean isGenEventMd5();
+public interface InputMarker <INPUT_TYPE extends Input> {
- Boolean isUseEventMd5AsId();
-
- Boolean isCacheEnabled();
-
- String getCacheKeyField();
-
- Boolean getCacheLastDedupEnabled();
-
- Integer getCacheSize();
-
- Long getCacheDedupInterval();
-
- Boolean isEnabled();
+ INPUT_TYPE getInput();
Map<String, Object> getAllProperties();
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
similarity index 98%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
index d9cfef8..e0509fe 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/cache/LRUCache.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/input/cache/LRUCache.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.input.cache;
+package org.apache.ambari.logfeeder.plugin.input.cache;
import com.google.common.collect.EvictingQueue;
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
similarity index 59%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
index 2ad1fac..674f51f 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/BlockManager.java
@@ -16,39 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.plugin.manager;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
-import java.util.Map;
+import java.util.List;
-public interface InputDescriptor {
- String getType();
+public interface BlockManager {
- String getRowtype();
+ void init() throws Exception;
- String getPath();
+ void close();
- Map<String, String> getAddFields();
+ void logStats();
- String getSource();
+ void addMetricsContainers(List<MetricData> metricsList);
- Boolean isTail();
-
- Boolean isGenEventMd5();
-
- Boolean isUseEventMd5AsId();
-
- Boolean isCacheEnabled();
-
- String getCacheKeyField();
-
- Boolean getCacheLastDedupEnabled();
-
- Integer getCacheSize();
-
- Long getCacheDedupInterval();
-
- Boolean isEnabled();
-
- Map<String, Object> getAllProperties();
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
similarity index 55%
copy from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
index fa4e17b..0734158 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/InputManager.java
@@ -16,27 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.ambari.logfeeder.output;
+package org.apache.ambari.logfeeder.plugin.manager;
+
+import org.apache.ambari.logfeeder.plugin.input.Input;
import java.io.File;
+import java.util.List;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.log4j.Logger;
-/**
- * Output that just ignore the logs
- */
-public class OutputDevNull extends Output {
+public abstract class InputManager implements BlockManager {
+
+ public abstract void addToNotReady(Input input);
+
+ public abstract void checkInAll();
+
+ public abstract List<Input> getInputList(String serviceName);
+
+ public abstract void add(String serviceName, Input input);
+
+ public abstract void removeInput(Input input);
+
+ public abstract File getCheckPointFolderFile();
- private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
+ public abstract void cleanCheckPointFiles();
- @Override
- public void write(String block, InputMarker inputMarker){
- LOG.trace("Ignore log block: " + block);
- }
+ public abstract void removeInputsForService(String serviceName);
- @Override
- public void copyFile(File inputFile, InputMarker inputMarker) {
- throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
- }
+ public abstract void startInputs(String serviceName);
}
diff --git a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
similarity index 53%
copy from ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
copy to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
index 2ad1fac..3a3c601 100644
--- a/ambari-logsearch/ambari-logsearch-config-api/src/main/java/org/apache/ambari/logsearch/config/api/model/inputconfig/InputDescriptor.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/manager/OutputManager.java
@@ -16,39 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.ambari.logfeeder.plugin.manager;
-package org.apache.ambari.logsearch.config.api.model.inputconfig;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import java.io.File;
+import java.util.List;
import java.util.Map;
-public interface InputDescriptor {
- String getType();
+public abstract class OutputManager implements BlockManager {
- String getRowtype();
+ public abstract void write(Map<String, Object> jsonObj, InputMarker inputMarker);
- String getPath();
+ public abstract void write(String jsonBlock, InputMarker inputMarker);
- Map<String, String> getAddFields();
+ public abstract void copyFile(File file, InputMarker marker);
- String getSource();
+ public abstract void add(Output output);
- Boolean isTail();
+ public abstract List<Output> getOutputs();
- Boolean isGenEventMd5();
+ public abstract List<? extends OutputConfigMonitor> getOutputsToMonitor();
- Boolean isUseEventMd5AsId();
-
- Boolean isCacheEnabled();
-
- String getCacheKeyField();
-
- Boolean getCacheLastDedupEnabled();
-
- Integer getCacheSize();
-
- Long getCacheDedupInterval();
-
- Boolean isEnabled();
-
- Map<String, Object> getAllProperties();
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
similarity index 59%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
rename to ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
index c0075b5..a2f13b8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder-plugin-api/src/main/java/org/apache/ambari/logfeeder/plugin/output/Output.java
@@ -16,84 +16,62 @@
* specific language governing permissions and limitations
* under the License.
*/
-
-package org.apache.ambari.logfeeder.output;
+package org.apache.ambari.logfeeder.plugin.output;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
+import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
-import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
+public abstract class Output<PROP_TYPE extends LogFeederProperties, INPUT_MARKER extends InputMarker> extends ConfigItem<PROP_TYPE> implements OutputConfigMonitor {
-public abstract class Output extends ConfigBlock implements OutputConfigMonitor {
- private String destination = null;
+ private static final Logger LOG = LoggerFactory.getLogger(Output.class);
+
+ private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
+ private LogSearchConfigLogFeeder logSearchConfig;
+ private String destination = null;
+ private boolean isClosed;
protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
- protected String getWriteBytesMetricName() {
- return null;
- }
- public boolean monitorConfigChanges() {
- return false;
- };
-
- @Override
- public String getOutputType() {
- throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
- }
-
- @Override
- public void outputConfigChanged(OutputProperties outputProperties) {
- throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
- };
+ public abstract String getOutputType();
- @Override
- public String getShortDescription() {
- return null;
- }
+ public abstract void outputConfigChanged(OutputProperties outputProperties);
- @Override
- public String getNameForThread() {
- if (destination != null) {
- return destination;
- }
- return super.getNameForThread();
- }
+ public abstract void copyFile(File inputFile, InputMarker inputMarker) throws Exception;
- public abstract void write(String block, InputMarker inputMarker) throws Exception;
-
- public abstract void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException;
+ public abstract void write(String jsonStr, INPUT_MARKER inputMarker) throws Exception;
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
- write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
- }
+ public abstract Long getPendingCount();
- boolean isClosed = false;
+ public abstract String getWriteBytesMetricName();
- /**
- * Extend this method to clean up
- */
- public void close() {
- LOG.info("Calling base close()." + getShortDescription());
- isClosed = true;
+ public String getNameForThread() {
+ return this.getClass().getSimpleName();
}
- /**
- * This is called on shutdown. All output should extend it.
- */
- public boolean isClosed() {
- return isClosed;
+ public boolean monitorConfigChanges() {
+ return false;
+ };
+
+ public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
+ this.logSearchConfig = logSearchConfig;
}
- public long getPendingCount() {
- return 0;
+ public LogSearchConfigLogFeeder getLogSearchConfig() {
+ return logSearchConfig;
}
public String getDestination() {
@@ -104,10 +82,16 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
this.destination = destination;
}
- protected LogSearchConfigLogFeeder logSearchConfig;
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ public void setClosed(boolean closed) {
+ isClosed = closed;
+ }
- public void setLogSearchConfig(LogSearchConfigLogFeeder logSearchConfig) {
- this.logSearchConfig = logSearchConfig;
+ public void write(Map<String, Object> jsonObj, INPUT_MARKER inputMarker) throws Exception {
+ write(gson.toJson(jsonObj), inputMarker);
}
@Override
@@ -121,10 +105,16 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
super.logStat();
logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
}
-
+
+ @Override
+ public boolean logConfigs() {
+ // TODO: log something about the configs
+ return true;
+ }
+
public void trimStrValue(Map<String, Object> jsonObj) {
if (jsonObj != null) {
- for (Entry<String, Object> entry : jsonObj.entrySet()) {
+ for (Map.Entry<String, Object> entry : jsonObj.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value != null && value instanceof String) {
@@ -134,4 +124,9 @@ public abstract class Output extends ConfigBlock implements OutputConfigMonitor
}
}
}
+
+ public void close() {
+ LOG.info("Calling base close()." + getShortDescription());
+ isClosed = true;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index 6a3524d..c116378 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -49,6 +49,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-logsearch-logfeeder-plugin-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.3.1</version>
@@ -176,6 +181,11 @@
<version>1</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.6</version>
+ </dependency>
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>${spring-boot.version}</version>
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
deleted file mode 100644
index cfcc199..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigBlock.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.common;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Priority;
-
-
-public abstract class ConfigBlock extends ConfigItem {
- protected Map<String, Object> configs;
- protected Map<String, String> contextFields = new HashMap<String, String>();
- public ConfigBlock() {
- }
-
- public void loadConfig(Map<String, Object> map) {
- configs = LogFeederUtil.cloneObject(map);
-
- Map<String, String> nvList = getNVList("add_fields");
- if (nvList != null) {
- contextFields.putAll(nvList);
- }
- }
-
- public Map<String, Object> getConfigs() {
- return configs;
- }
-
- public boolean isFieldConditionMatch(String fieldName, String stringValue) {
- boolean allow = false;
- String fieldValue = (String) configs.get(fieldName);
- if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
- allow = true;
- } else {
- @SuppressWarnings("unchecked")
- Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
- if (addFields != null && addFields.get(fieldName) != null) {
- String addFieldValue = (String) addFields.get(fieldName);
- if (stringValue.equalsIgnoreCase(addFieldValue)) {
- allow = true;
- }
- }
-
- }
- return allow;
- }
-
- @SuppressWarnings("unchecked")
- public Map<String, String> getNVList(String key) {
- return (Map<String, String>) configs.get(key);
- }
-
- public String getStringValue(String key) {
- Object value = configs.get(key);
- if (value != null && value.toString().equalsIgnoreCase("none")) {
- value = null;
- }
- if (value != null) {
- return value.toString();
- }
- return null;
- }
-
- public String getStringValue(String key, String defaultValue) {
- Object value = configs.get(key);
- if (value != null && value.toString().equalsIgnoreCase("none")) {
- value = null;
- }
-
- if (value != null) {
- return value.toString();
- }
- return defaultValue;
- }
-
- public Object getConfigValue(String key) {
- return configs.get(key);
- }
-
- public boolean getBooleanValue(String key, boolean defaultValue) {
- String strValue = getStringValue(key);
- boolean retValue = defaultValue;
- if (!StringUtils.isEmpty(strValue)) {
- retValue = (strValue.equalsIgnoreCase("true") || strValue.equalsIgnoreCase("yes"));
- }
- return retValue;
- }
-
- public int getIntValue(String key, int defaultValue) {
- String strValue = getStringValue(key);
- int retValue = defaultValue;
- if (!StringUtils.isEmpty(strValue)) {
- try {
- retValue = Integer.parseInt(strValue);
- } catch (Throwable t) {
- LOG.error("Error parsing integer value. key=" + key + ", value=" + strValue);
- }
- }
- return retValue;
- }
-
- public long getLongValue(String key, long defaultValue) {
- String strValue = getStringValue(key);
- Long retValue = defaultValue;
- if (!StringUtils.isEmpty(strValue)) {
- try {
- retValue = Long.parseLong(strValue);
- } catch (Throwable t) {
- LOG.error("Error parsing long value. key=" + key + ", value=" + strValue);
- }
- }
- return retValue;
- }
-
- @Override
- public boolean isEnabled() {
- return getBooleanValue("is_enabled", true);
- }
-
- public Map<String, String> getContextFields() {
- return contextFields;
- }
-
- public boolean logConfigs(Priority level) {
- if (!super.logConfigs(level)) {
- return false;
- }
- LOG.log(level, "Printing configuration Block=" + getShortDescription());
- LOG.log(level, "configs=" + configs);
- LOG.log(level, "contextFields=" + contextFields);
- return true;
- }
-}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
index 037f4a1..f138c13 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigHandler.java
@@ -16,39 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.common;
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.lang.reflect.Type;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import com.google.common.collect.Maps;
+import com.google.gson.reflect.TypeToken;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputManager;
import org.apache.ambari.logfeeder.input.InputSimulate;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.output.OutputManager;
-import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.AliasUtil;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.ambari.logfeeder.util.AliasUtil.AliasType;
import org.apache.ambari.logsearch.config.api.InputConfigMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -58,18 +39,32 @@ import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterDescriptorImpl;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.log4j.Level;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
-
-import com.google.gson.reflect.TypeToken;
import org.springframework.core.io.ClassPathResource;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.inject.Inject;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.lang.reflect.Type;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
public class ConfigHandler implements InputConfigMonitor {
- private static final Logger LOG = Logger.getLogger(ConfigHandler.class);
+ private static final Logger LOG = Logger.getLogger(org.apache.ambari.logfeeder.common.ConfigHandler.class);
private final LogSearchConfigLogFeeder logSearchConfig;
@@ -86,9 +81,9 @@ public class ConfigHandler implements InputConfigMonitor {
private final List<InputDescriptor> inputConfigList = new ArrayList<>();
private final List<FilterDescriptor> filterConfigList = new ArrayList<>();
private final List<Map<String, Object>> outputConfigList = new ArrayList<>();
-
+
private boolean simulateMode = false;
-
+
public ConfigHandler(LogSearchConfigLogFeeder logSearchConfig) {
this.logSearchConfig = logSearchConfig;
}
@@ -99,13 +94,13 @@ public class ConfigHandler implements InputConfigMonitor {
logSearchConfig.init(Maps.fromProperties(logFeederProps.getProperties()), logFeederProps.getClusterName());
loadOutputs();
simulateIfNeeded();
-
+
inputManager.init();
outputManager.init();
-
+
logSearchConfig.monitorOutputProperties(outputManager.getOutputsToMonitor());
}
-
+
private void loadConfigFiles() throws Exception {
List<String> configFiles = getConfigFiles();
for (String configFileName : configFiles) {
@@ -125,13 +120,13 @@ public class ConfigHandler implements InputConfigMonitor {
private List<String> getConfigFiles() {
List<String> configFiles = new ArrayList<>();
-
+
String logFeederConfigFilesProperty = logFeederProps.getConfigFiles();
LOG.info("logfeeder.config.files=" + logFeederConfigFilesProperty);
if (logFeederConfigFilesProperty != null) {
configFiles.addAll(Arrays.asList(logFeederConfigFilesProperty.split(",")));
}
-
+
return configFiles;
}
@@ -155,22 +150,22 @@ public class ConfigHandler implements InputConfigMonitor {
loadConfigs(configData);
}
}
-
+
@Override
public void loadInputConfigs(String serviceName, InputConfig inputConfig) throws Exception {
inputConfigList.clear();
filterConfigList.clear();
-
+
inputConfigList.addAll(inputConfig.getInput());
filterConfigList.addAll(inputConfig.getFilter());
-
+
if (simulateMode) {
InputSimulate.loadTypeToFilePath(inputConfigList);
} else {
loadInputs(serviceName);
loadFilters(serviceName);
assignOutputsToInputs(serviceName);
-
+
inputManager.startInputs(serviceName);
}
}
@@ -190,7 +185,7 @@ public class ConfigHandler implements InputConfigMonitor {
if (inputConfigList.isEmpty()) {
throw new IllegalArgumentException("Log Id " + logId + " was not found in shipper configuriaton");
}
-
+
for (FilterDescriptor filterDescriptor : inputConfig.getFilter()) {
if ("grok".equals(filterDescriptor.getFilter())) {
// Thus ensure that the log entry passed will be parsed immediately
@@ -201,7 +196,7 @@ public class ConfigHandler implements InputConfigMonitor {
loadInputs("test");
loadFilters("test");
List<Input> inputList = inputManager.getInputList("test");
-
+
return inputList != null && inputList.size() == 1 ? inputList.get(0) : null;
}
@@ -226,17 +221,17 @@ public class ConfigHandler implements InputConfigMonitor {
}
}
}
-
+
@Override
public List<String> getGlobalConfigJsons() {
return globalConfigJsons;
}
-
+
private void simulateIfNeeded() throws Exception {
int simulatedInputNumber = logFeederProps.getInputSimulateConfig().getSimulateInputNumber();
if (simulatedInputNumber == 0)
return;
-
+
InputConfigImpl simulateInputConfig = new InputConfigImpl();
List<InputDescriptorImpl> inputConfigDescriptors = new ArrayList<>();
simulateInputConfig.setInput(inputConfigDescriptors);
@@ -248,9 +243,9 @@ public class ConfigHandler implements InputConfigMonitor {
inputDescriptor.setAddFields(new HashMap<String, String>());
inputConfigDescriptors.add(inputDescriptor);
}
-
+
loadInputConfigs("Simulation", simulateInputConfig);
-
+
simulateMode = true;
}
@@ -266,7 +261,7 @@ public class ConfigHandler implements InputConfigMonitor {
LOG.error("Output block doesn't have destination element");
continue;
}
- Output output = (Output) AliasUtil.getClassInstance(value, AliasType.OUTPUT);
+ Output output = (Output) AliasUtil.getClassInstance(value, AliasUtil.AliasType.OUTPUT);
if (output == null) {
LOG.error("Output object could not be found");
continue;
@@ -277,7 +272,7 @@ public class ConfigHandler implements InputConfigMonitor {
// We will only check for is_enabled out here. Down below we will check whether this output is enabled for the input
if (output.isEnabled()) {
- output.logConfigs(Level.INFO);
+ output.logConfigs();
outputManager.add(output);
} else {
LOG.info("Output is disabled. So ignoring it. " + output.getShortDescription());
@@ -296,7 +291,7 @@ public class ConfigHandler implements InputConfigMonitor {
LOG.error("Input block doesn't have source element");
continue;
}
- Input input = (Input) AliasUtil.getClassInstance(source, AliasType.INPUT);
+ Input input = (Input) AliasUtil.getClassInstance(source, AliasUtil.AliasType.INPUT);
if (input == null) {
LOG.error("Input object could not be found");
continue;
@@ -308,7 +303,7 @@ public class ConfigHandler implements InputConfigMonitor {
input.setOutputManager(outputManager);
input.setInputManager(inputManager);
inputManager.add(serviceName, input);
- input.logConfigs(Level.INFO);
+ input.logConfigs();
} else {
LOG.info("Input is disabled. So ignoring it. " + input.getShortDescription());
}
@@ -338,7 +333,7 @@ public class ConfigHandler implements InputConfigMonitor {
LOG.error("Filter block doesn't have filter element");
continue;
}
- Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasType.FILTER);
+ Filter filter = (Filter) AliasUtil.getClassInstance(value, AliasUtil.AliasType.FILTER);
if (filter == null) {
LOG.error("Filter object could not be found");
continue;
@@ -348,9 +343,9 @@ public class ConfigHandler implements InputConfigMonitor {
filter.setOutputManager(outputManager);
input.addFilter(filter);
- filter.logConfigs(Level.INFO);
+ filter.logConfigs();
}
-
+
if (input.getFirstFilter() == null) {
toRemoveInputList.add(input);
}
@@ -384,7 +379,7 @@ public class ConfigHandler implements InputConfigMonitor {
}
}
}
-
+
// In case of simulation copies of the output are added for each simulation instance, these must be added to the manager
for (Output output : InputSimulate.getSimulateOutputs()) {
output.setLogSearchConfig(logSearchConfig);
@@ -435,7 +430,7 @@ public class ConfigHandler implements InputConfigMonitor {
inputManager.logStats();
outputManager.logStats();
}
-
+
public void addMetrics(List<MetricData> metricsList) {
inputManager.addMetricsContainers(metricsList);
outputManager.addMetricsContainers(metricsList);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
deleted file mode 100644
index 30bd9fd..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/ConfigItem.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.common;
-
-import java.util.List;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Priority;
-
-public abstract class ConfigItem {
-
- protected static final Logger LOG = Logger.getLogger(ConfigBlock.class);
- private boolean drain = false;
- private LogFeederProps logFeederProps;
- public MetricData statMetric = new MetricData(getStatMetricName(), false);
-
- public ConfigItem() {
- super();
- }
-
- protected String getStatMetricName() {
- return null;
- }
-
- /**
- * Used while logging. Keep it short and meaningful
- */
- public abstract String getShortDescription();
-
- /**
- * Every implementor need to give name to the thread they create
- */
- public String getNameForThread() {
- return this.getClass().getSimpleName();
- }
-
- public void addMetricsContainers(List<MetricData> metricsList) {
- metricsList.add(statMetric);
- }
-
- /**
- * This method needs to be overwritten by deriving classes.
- */
- public void init(LogFeederProps logFeederProps) throws Exception {
- this.logFeederProps = logFeederProps;
- }
-
- public abstract boolean isEnabled();
-
- public void incrementStat(int count) {
- statMetric.value += count;
- }
-
- public void logStatForMetric(MetricData metric, String prefixStr) {
- LogFeederUtil.logStatForMetric(metric, prefixStr, ", key=" + getShortDescription());
- }
-
- public synchronized void logStat() {
- logStatForMetric(statMetric, "Stat");
- }
-
- public boolean logConfigs(Priority level) {
- if (level.toInt() == Priority.INFO_INT && !LOG.isInfoEnabled()) {
- return false;
- }
- if (level.toInt() == Priority.DEBUG_INT && !LOG.isDebugEnabled()) {
- return false;
- }
- return true;
- }
-
- public boolean isDrain() {
- return drain;
- }
-
- public void setDrain(boolean drain) {
- this.drain = drain;
- }
-
- public LogFeederProps getLogFeederProps() {
- return logFeederProps;
- }
-}
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
index 1a701e1..81f7317 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/common/LogEntryParseTester.java
@@ -26,10 +26,12 @@ import java.util.List;
import java.util.Map;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputConfig;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigGson;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputConfigImpl;
@@ -78,21 +80,54 @@ public class LogEntryParseTester {
Input input = configHandler.getTestInput(inputConfig, logId);
final Map<String, Object> result = new HashMap<>();
input.getFirstFilter().init(new LogFeederProps());
- input.addOutput(new Output() {
+ input.addOutput(new Output<LogFeederProps, InputFileMarker>() {
@Override
- public void write(String block, InputMarker inputMarker) throws Exception {
+ public void init(LogFeederProps logFeederProperties) throws Exception {
}
-
+
+ @Override
+ public String getShortDescription() {
+ return null;
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return null;
+ }
+
+ @Override
+ public void write(String block, InputFileMarker inputMarker) throws Exception {
+ }
+
+ @Override
+ public Long getPendingCount() {
+ return null;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return null;
+ }
+
+ @Override
+ public String getOutputType() {
+ return null;
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ }
+
@Override
public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
}
@Override
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) {
result.putAll(jsonObj);
}
});
- input.outputLine(logEntry, new InputMarker(input, null, 0));
+ input.outputLine(logEntry, new InputFileMarker(input, null, 0));
return result.isEmpty() ?
ImmutableMap.of("errorMessage", (Object)"Could not parse test log entry") :
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
index cfb6c78..cfb199c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/ApplicationConfig.java
@@ -19,14 +19,16 @@
package org.apache.ambari.logfeeder.conf;
import com.google.common.collect.Maps;
-import org.apache.ambari.logfeeder.common.ConfigHandler;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.input.InputConfigUploader;
-import org.apache.ambari.logfeeder.input.InputManager;
+import org.apache.ambari.logfeeder.input.InputManagerImpl;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
+import org.apache.ambari.logfeeder.common.ConfigHandler;
import org.apache.ambari.logfeeder.metrics.MetricsManager;
import org.apache.ambari.logfeeder.metrics.StatsLogger;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.output.OutputManagerImpl;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
import org.apache.ambari.logsearch.config.api.LogSearchConfigFactory;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
import org.apache.ambari.logsearch.config.zookeeper.LogSearchConfigLogFeederZK;
@@ -95,13 +97,14 @@ public class ApplicationConfig {
return new StatsLogger();
}
+
@Bean
public InputManager inputManager() {
- return new InputManager();
+ return new InputManagerImpl();
}
@Bean
public OutputManager outputManager() {
- return new OutputManager();
+ return new OutputManagerImpl();
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
index 353bdc1..92d2ebe 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogEntryCacheConfig.java
@@ -64,7 +64,7 @@ public class LogEntryCacheConfig {
sources = {LogFeederConstants.LOGFEEDER_PROPERTIES_FILE}
)
@Value("${" + LogFeederConstants.CACHE_LAST_DEDUP_ENABLED_PROPERTY + ":" + LogFeederConstants.DEFAULT_CACHE_LAST_DEDUP_ENABLED + "}")
- private Boolean cacheLastDedupEnabled;
+ private boolean cacheLastDedupEnabled;
@LogSearchPropertyDescription(
name = LogFeederConstants.CACHE_DEDUP_INTERVAL_PROPERTY,
@@ -101,7 +101,7 @@ public class LogEntryCacheConfig {
}
public boolean isCacheLastDedupEnabled() {
- return cacheLastDedupEnabled;
+ return this.cacheLastDedupEnabled;
}
public void setCacheLastDedupEnabled(boolean cacheLastDedupEnabled) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
index 367d1cd..d451496 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederProps.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.conf;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.plugin.common.LogFeederProperties;
import org.apache.ambari.logsearch.config.api.LogSearchPropertyDescription;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
@@ -36,7 +37,7 @@ import java.util.Properties;
import java.util.stream.Stream;
@Configuration
-public class LogFeederProps {
+public class LogFeederProps implements LogFeederProperties {
@Inject
private Environment env;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
index 8a45753..c1c7755 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/conf/LogFeederSecurityConfig.java
@@ -26,11 +26,13 @@ import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.io.File;
import java.nio.charset.Charset;
+@Configuration
public class LogFeederSecurityConfig {
private static final Logger LOG = LoggerFactory.getLogger(LogFeederSecurityConfig.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index f0ef31b..21b1ea4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -19,6 +19,20 @@
package org.apache.ambari.logfeeder.filter;
+import com.google.gson.reflect.TypeToken;
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
+import org.apache.commons.lang3.BooleanUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.reflect.Type;
@@ -31,23 +45,7 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
-import oi.thekraken.grok.api.Grok;
-import oi.thekraken.grok.api.exception.GrokException;
-
-import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
-import org.apache.commons.lang3.BooleanUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.google.gson.reflect.TypeToken;
-
-public class FilterGrok extends Filter {
+public class FilterGrok extends Filter<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(FilterGrok.class);
private static final String GROK_PATTERN_FILE = "grok-patterns";
@@ -78,10 +76,10 @@ public class FilterGrok extends Filter {
super.init(logFeederProps);
try {
- messagePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMessagePattern());
- multilinePattern = escapePattern(((FilterGrokDescriptor)filterDescriptor).getMultilinePattern());
- sourceField = filterDescriptor.getSourceField();
- removeSourceField = BooleanUtils.toBooleanDefaultIfNull(filterDescriptor.isRemoveSourceField(), removeSourceField);
+ messagePattern = escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMessagePattern());
+ multilinePattern = escapePattern(((FilterGrokDescriptor)getFilterDescriptor()).getMultilinePattern());
+ sourceField = getFilterDescriptor().getSourceField();
+ removeSourceField = BooleanUtils.toBooleanDefaultIfNull(getFilterDescriptor().isRemoveSourceField(), removeSourceField);
LOG.info("init() done. grokPattern=" + messagePattern + ", multilinePattern=" + multilinePattern + ", " +
getShortDescription());
@@ -161,7 +159,7 @@ public class FilterGrok extends Filter {
}
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws Exception {
if (grokMessage == null) {
return;
}
@@ -196,7 +194,7 @@ public class FilterGrok extends Filter {
}
@Override
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
if (sourceField != null) {
savedInputMarker = inputMarker;
applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
@@ -206,7 +204,7 @@ public class FilterGrok extends Filter {
}
}
- private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws LogFeederException {
+ private void applyMessage(String inputStr, Map<String, Object> jsonObj, String multilineJsonStr) throws Exception {
String jsonStr = grokMessage.capture(inputStr);
boolean parseError = false;
@@ -251,7 +249,7 @@ public class FilterGrok extends Filter {
String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
int inputStrLength = inputStr != null ? inputStr.length() : 0;
LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStrLength + ", input=" +
- input.getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
+ getInput().getShortDescription() + ". First upto 100 characters=" + StringUtils.abbreviate(inputStr, 100), null, LOG,
Level.WARN);
}
@@ -261,7 +259,7 @@ public class FilterGrok extends Filter {
Map<String, Object> jsonObj = Collections.synchronizedMap(new HashMap<String, Object>());
try {
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
- } catch (LogFeederException e) {
+ } catch (Exception e) {
LOG.error(e.getLocalizedMessage(), e.getCause());
}
strBuff = null;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index 1a2da0c..207d6f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -18,18 +18,24 @@
*/
package org.apache.ambari.logfeeder.filter;
-import java.util.Map;
-
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.common.LogFeederException;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class FilterJSON extends Filter<LogFeederProps> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FilterJSON.class);
-public class FilterJSON extends Filter {
-
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws Exception {
Map<String, Object> jsonMap = null;
try {
jsonMap = LogFeederUtil.toJSONObject(inputStr);
@@ -50,4 +56,10 @@ public class FilterJSON extends Filter {
}
super.apply(jsonMap, inputMarker);
}
+
+ @Override
+ public String getShortDescription() {
+ return "filter:filter=json,input=" + getInput().getShortDescription();
+ }
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index adcf0a4..695c7e3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -19,21 +19,25 @@
package org.apache.ambari.logfeeder.filter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class FilterKeyValue extends Filter<LogFeederProps> {
+
+ private static final Logger LOG = Logger.getLogger(FilterKeyValue.class);
-public class FilterKeyValue extends Filter {
private String sourceField = null;
private String valueSplit = "=";
private String fieldSplit = "\t";
@@ -45,26 +49,26 @@ public class FilterKeyValue extends Filter {
public void init(LogFeederProps logFeederProps) throws Exception {
super.init(logFeederProps);
- sourceField = filterDescriptor.getSourceField();
- valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getValueSplit(), valueSplit);
- fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)filterDescriptor).getFieldSplit(), fieldSplit);
- valueBorders = ((FilterKeyValueDescriptor)filterDescriptor).getValueBorders();
+ sourceField = getFilterDescriptor().getSourceField();
+ valueSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)getFilterDescriptor()).getValueSplit(), valueSplit);
+ fieldSplit = StringUtils.defaultString(((FilterKeyValueDescriptor)getFilterDescriptor()).getFieldSplit(), fieldSplit);
+ valueBorders = ((FilterKeyValueDescriptor)getFilterDescriptor()).getValueBorders();
LOG.info("init() done. source_field=" + sourceField + ", value_split=" + valueSplit + ", " + ", field_split=" +
fieldSplit + ", " + getShortDescription());
if (StringUtils.isEmpty(sourceField)) {
- LOG.fatal("source_field is not set for filter. This filter will not be applied");
+ LOG.fatal("source_field is not set for filter. Thiss filter will not be applied");
return;
}
}
@Override
- public void apply(String inputStr, InputMarker inputMarker) throws LogFeederException {
+ public void apply(String inputStr, InputMarker inputMarker) throws Exception {
apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
}
@Override
- public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws LogFeederException {
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
if (sourceField == null) {
return;
}
@@ -136,7 +140,7 @@ public class FilterKeyValue extends Filter {
errorMetric.value++;
String logMessageKey = this.getClass().getSimpleName() + "_PARSEERROR";
LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error parsing string. length=" + inputStr.length() + ", input=" +
- input.getShortDescription() + ". First upto 200 characters=" + StringUtils.abbreviate(inputStr, 200), null, LOG,
+ getInput().getShortDescription() + ". First upto 200 characters=" + StringUtils.abbreviate(inputStr, 200), null, LOG,
Level.ERROR);
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
deleted file mode 100644
index cf295c5..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/AbstractInputFile.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.input;
-
-import java.io.BufferedReader;
-import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
-import org.apache.commons.lang.ObjectUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-
-public abstract class AbstractInputFile extends Input {
- private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
-
- protected File[] logFiles;
- protected String logPath;
- protected Object fileKey;
- protected String base64FileKey;
-
- protected boolean isReady;
-
- private String checkPointExtension;
- private int checkPointIntervalMS;
-
- private Map<String, File> checkPointFiles = new HashMap<>();
- private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
- private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
- private Map<String, InputMarker> lastCheckPointInputMarkers = new HashMap<>();
-
- private LogFeederProps logFeederProps;
-
- @Override
- protected String getStatMetricName() {
- return "input.files.read_lines";
- }
-
- @Override
- protected String getReadBytesMetricName() {
- return "input.files.read_bytes";
- }
-
- @Override
- public void init(LogFeederProps logFeederProps) throws Exception {
- this.logFeederProps = logFeederProps;
- LOG.info("init() called");
-
- checkPointExtension = logFeederProps.getCheckPointExtension();
-
- // Let's close the file and set it to true after we start monitoring it
- setClosed(true);
- logPath = inputDescriptor.getPath();
- checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)inputDescriptor).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
-
- if (StringUtils.isEmpty(logPath)) {
- LOG.error("path is empty for file input. " + getShortDescription());
- return;
- }
-
- setFilePath(logPath);
- boolean isFileReady = isReady();
-
- LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isFileReady);
-
- super.init(logFeederProps);
- }
-
- protected void processFile(File logPathFile, boolean follow) throws FileNotFoundException, IOException {
- LOG.info("Monitoring logPath=" + logPath + ", logPathFile=" + logPathFile);
- BufferedReader br = null;
-
- int lineCount = 0;
- try {
- setFilePath(logPathFile.getAbsolutePath());
-
- br = openLogFile(logPathFile);
-
- boolean resume = true;
- int resumeFromLineNumber = getResumeFromLineNumber();
- if (resumeFromLineNumber > 0) {
- LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber);
- resume = false;
- }
-
- setClosed(false);
- int sleepStep = 2;
- int sleepIteration = 0;
- while (true) {
- try {
- if (isDrain()) {
- break;
- }
-
- String line = br.readLine();
- if (line == null) {
- if (!resume) {
- resume = true;
- }
- sleepIteration++;
- if (sleepIteration == 2) {
- flush();
- if (!follow) {
- LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
- break;
- }
- } else if (sleepIteration > 4) {
- Object newFileKey = getFileKey(logPathFile);
- if (newFileKey != null && (fileKey == null || !newFileKey.equals(fileKey))) {
- LOG.info("File key is different. Marking this input file for rollover. oldKey=" + fileKey + ", newKey=" +
- newFileKey + ". " + getShortDescription());
-
- try {
- LOG.info("File is rolled over. Closing current open file." + getShortDescription() + ", lineCount=" +
- lineCount);
- br.close();
- } catch (Exception ex) {
- LOG.error("Error closing file" + getShortDescription(), ex);
- break;
- }
-
- try {
- LOG.info("Opening new rolled over file." + getShortDescription());
- br = openLogFile(logPathFile);
- lineCount = 0;
- } catch (Exception ex) {
- LOG.error("Error opening rolled over file. " + getShortDescription(), ex);
- LOG.info("Added input to not ready list." + getShortDescription());
- isReady = false;
- inputManager.addToNotReady(this);
- break;
- }
- LOG.info("File is successfully rolled over. " + getShortDescription());
- continue;
- }
- }
- try {
- Thread.sleep(sleepStep * 1000);
- sleepStep = Math.min(sleepStep * 2, 10);
- } catch (InterruptedException e) {
- LOG.info("Thread interrupted." + getShortDescription());
- }
- } else {
- lineCount++;
- sleepStep = 1;
- sleepIteration = 0;
-
- if (!resume && lineCount > resumeFromLineNumber) {
- LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + getShortDescription());
- resume = true;
- }
- if (resume) {
- InputMarker marker = new InputMarker(this, base64FileKey, lineCount);
- outputLine(line, marker);
- }
- }
- } catch (Throwable t) {
- String logMessageKey = this.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
- ", input=" + getShortDescription(), t, LOG, Level.ERROR);
- }
- }
- } finally {
- if (br != null) {
- LOG.info("Closing reader." + getShortDescription() + ", lineCount=" + lineCount);
- try {
- br.close();
- } catch (Throwable t) {
- // ignore
- }
- }
- }
- }
-
- protected abstract BufferedReader openLogFile(File logFile) throws IOException;
-
- protected abstract Object getFileKey(File logFile);
-
- private int getResumeFromLineNumber() {
- int resumeFromLineNumber = 0;
-
- File checkPointFile = null;
- try {
- LOG.info("Checking existing checkpoint file. " + getShortDescription());
-
- String checkPointFileName = base64FileKey + checkPointExtension;
- File checkPointFolder = inputManager.getCheckPointFolderFile();
- checkPointFile = new File(checkPointFolder, checkPointFileName);
- checkPointFiles.put(base64FileKey, checkPointFile);
- Map<String, Object> jsonCheckPoint = null;
- if (!checkPointFile.exists()) {
- LOG.info("Checkpoint file for log file " + filePath + " doesn't exist, starting to read it from the beginning");
- } else {
- try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
- int contentSize = checkPointWriter.readInt();
- byte b[] = new byte[contentSize];
- int readSize = checkPointWriter.read(b, 0, contentSize);
- if (readSize != contentSize) {
- LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
- readSize + ", checkPointFile=" + checkPointFile + ", input=" + getShortDescription());
- } else {
- String jsonCheckPointStr = new String(b, 0, readSize);
- jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
-
- resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
-
- LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
- ", resumeFromLineNumber=" + resumeFromLineNumber);
- }
- } catch (EOFException eofEx) {
- LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
- getShortDescription(), eofEx);
- }
- }
- if (jsonCheckPoint == null) {
- // This seems to be first time, so creating the initial checkPoint object
- jsonCheckPoint = new HashMap<String, Object>();
- jsonCheckPoint.put("file_path", filePath);
- jsonCheckPoint.put("file_key", base64FileKey);
- }
-
- jsonCheckPoints.put(base64FileKey, jsonCheckPoint);
-
- } catch (Throwable t) {
- LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
- }
-
- return resumeFromLineNumber;
- }
-
- @Override
- public synchronized void checkIn(InputMarker inputMarker) {
- try {
- Map<String, Object> jsonCheckPoint = jsonCheckPoints.get(inputMarker.base64FileKey);
- File checkPointFile = checkPointFiles.get(inputMarker.base64FileKey);
-
- int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
- if (lineNumber > inputMarker.lineNumber) {
- // Already wrote higher line number for this input
- return;
- }
- // If interval is greater than last checkPoint time, then write
- long currMS = System.currentTimeMillis();
- long lastCheckPointTimeMs = lastCheckPointTimeMSs.containsKey(inputMarker.base64FileKey) ?
- lastCheckPointTimeMSs.get(inputMarker.base64FileKey) : 0;
- if (!isClosed() && (currMS - lastCheckPointTimeMs < checkPointIntervalMS)) {
- // Let's save this one so we can update the check point file on flush
- lastCheckPointInputMarkers.put(inputMarker.base64FileKey, inputMarker);
- return;
- }
- lastCheckPointTimeMSs.put(inputMarker.base64FileKey, currMS);
-
- jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.lineNumber));
- jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
- jsonCheckPoint.put("last_write_time_date", new Date());
-
- String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
-
- File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp");
- if (tmpCheckPointFile.exists()) {
- tmpCheckPointFile.delete();
- }
- RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws");
- tmpRaf.writeInt(jsonStr.length());
- tmpRaf.write(jsonStr.getBytes());
- tmpRaf.getFD().sync();
- tmpRaf.close();
-
- FileUtil.move(tmpCheckPointFile, checkPointFile);
-
- if (isClosed()) {
- String logMessageKey = this.getClass().getSimpleName() + "_FINAL_CHECKIN";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + getShortDescription() +
- ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
- }
- } catch (Throwable t) {
- String logMessageKey = this.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + getShortDescription(), t,
- LOG, Level.ERROR);
- }
- }
-
- @Override
- public void lastCheckIn() {
- for (InputMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
- checkIn(lastCheckPointInputMarker);
- }
- }
-
- @Override
- public void close() {
- super.close();
- LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
- lastCheckIn();
- isClosed = true;
- }
-
- @Override
- public String getShortDescription() {
- return "input:source=" + inputDescriptor.getSource() + ", path=" +
- (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
- }
-}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
index e8066be..0c551cd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputConfigUploader.java
@@ -16,31 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.input;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.nio.charset.Charset;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.io.Files;
+import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logfeeder.common.ConfigHandler;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-
-import com.google.common.io.Files;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.nio.charset.Charset;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class InputConfigUploader extends Thread {
-
protected static final Logger LOG = LoggerFactory.getLogger(InputConfigUploader.class);
private static final long SLEEP_BETWEEN_CHECK = 2000;
@@ -73,7 +69,7 @@ public class InputConfigUploader extends Thread {
this.start();
config.monitorInputConfigChanges(configHandler, logLevelFilterHandler, logFeederProps.getClusterName());
}
-
+
@Override
public void run() {
while (true) {
@@ -98,7 +94,7 @@ public class InputConfigUploader extends Thread {
} else {
LOG.warn("Cannot find input config files in config dir ({})", logFeederProps.getConfDir());
}
-
+
try {
Thread.sleep(SLEEP_BETWEEN_CHECK);
} catch (InterruptedException e) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index c35c831..8b5310f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -18,22 +18,61 @@
*/
package org.apache.ambari.logfeeder.input;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.util.Arrays;
-import java.util.Comparator;
-
+import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.ambari.logfeeder.input.file.FileCheckInHelper;
+import org.apache.ambari.logfeeder.input.file.ProcessFileHelper;
+import org.apache.ambari.logfeeder.input.file.ResumeLineNumberHelper;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileBaseDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputFileDescriptor;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.BooleanUtils;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileFilter;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class InputFile extends Input<LogFeederProps, InputFileMarker> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InputFile.class);
-public class InputFile extends AbstractInputFile {
+ private static final boolean DEFAULT_TAIL = true;
+ private static final boolean DEFAULT_USE_EVENT_MD5 = false;
+ private static final boolean DEFAULT_GEN_EVENT_MD5 = true;
+ private static final int DEFAULT_CHECKPOINT_INTERVAL_MS = 5 * 1000;
+
+ private boolean isReady;
+
+ private boolean tail;
+
+ private String filePath;
+ private File[] logFiles;
+ private String logPath;
+ private Object fileKey;
+ private String base64FileKey;
+ private String checkPointExtension;
+ private int checkPointIntervalMS;
+
+ private Map<String, File> checkPointFiles = new HashMap<>();
+ private Map<String, Long> lastCheckPointTimeMSs = new HashMap<>();
+ private Map<String, Map<String, Object>> jsonCheckPoints = new HashMap<>();
+ private Map<String, InputFileMarker> lastCheckPointInputMarkers = new HashMap<>();
+
+ private Thread thread;
@Override
public boolean isReady() {
@@ -43,7 +82,7 @@ public class InputFile extends AbstractInputFile {
if (!ArrayUtils.isEmpty(logFiles) && logFiles[0].isFile()) {
if (tail && logFiles.length > 1) {
LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
- ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
+ ". Will follow only the first one. Using " + logFiles[0].getAbsolutePath());
}
LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
isReady = true;
@@ -54,6 +93,45 @@ public class InputFile extends AbstractInputFile {
return isReady;
}
+ @Override
+ public void setReady(boolean isReady) {
+ this.isReady = isReady;
+ }
+
+ @Override
+ public String getNameForThread() {
+ if (filePath != null) {
+ try {
+ return (getType() + "=" + (new File(filePath)).getName());
+ } catch (Throwable ex) {
+ LOG.warn("Couldn't get basename for filePath=" + filePath, ex);
+ }
+ }
+ return super.getNameForThread() + ":" + getType();
+ }
+
+ @Override
+ public synchronized void checkIn(InputFileMarker inputMarker) {
+ FileCheckInHelper.checkIn(this, inputMarker);
+ }
+
+ @Override
+ public void lastCheckIn() {
+ for (InputFileMarker lastCheckPointInputMarker : lastCheckPointInputMarkers.values()) {
+ checkIn(lastCheckPointInputMarker);
+ }
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return "input.files.read_lines";
+ }
+
+ @Override
+ public String getReadBytesMetricName() {
+ return "input.files.read_bytes";
+ }
+
private File[] getActualFiles(String searchPath) {
File searchFile = new File(searchPath);
if (!searchFile.getParentFile().exists()) {
@@ -63,20 +141,71 @@ public class InputFile extends AbstractInputFile {
} else {
FileFilter fileFilter = new WildcardFileFilter(searchFile.getName());
File[] logFiles = searchFile.getParentFile().listFiles(fileFilter);
- Arrays.sort(logFiles,
- new Comparator<File>() {
- @Override
- public int compare(File o1, File o2) {
- return o1.getName().compareTo(o2.getName());
- }
- });
+ Arrays.sort(logFiles, Comparator.comparing(File::getName));
return logFiles;
}
}
@Override
- void start() throws Exception {
- boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getProcessFile(), true);
+ public boolean monitor() {
+ if (isReady()) {
+ LOG.info("Starting thread. " + getShortDescription());
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public List<InputFile> getChildInputs() {
+ return null;
+ }
+
+ @Override
+ public InputFileMarker getInputMarker() {
+ return null;
+ }
+
+ @Override
+ public void init(LogFeederProps logFeederProps) throws Exception {
+ super.init(logFeederProps);
+ LOG.info("init() called");
+
+ checkPointExtension = logFeederProps.getCheckPointExtension();
+
+ // Let's close the file and set it to true after we start monitoring it
+ setClosed(true);
+ logPath = getInputDescriptor().getPath();
+ checkPointIntervalMS = (int) ObjectUtils.defaultIfNull(((InputFileBaseDescriptor)getInputDescriptor()).getCheckpointIntervalMs(), DEFAULT_CHECKPOINT_INTERVAL_MS);
+
+ if (StringUtils.isEmpty(logPath)) {
+ LOG.error("path is empty for file input. " + getShortDescription());
+ return;
+ }
+
+ setFilePath(logPath);
+ boolean isFileReady = isReady();
+ LOG.info("File to monitor " + logPath + ", tail=" + tail + ", isReady=" + isReady());
+
+ LogEntryCacheConfig cacheConfig = logFeederProps.getLogEntryCacheConfig();
+ initCache(
+ cacheConfig.isCacheEnabled(),
+ cacheConfig.getCacheKeyField(),
+ cacheConfig.getCacheSize(),
+ cacheConfig.isCacheLastDedupEnabled(),
+ cacheConfig.getCacheDedupInterval(),
+ getFilePath());
+
+ tail = BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isTail(), DEFAULT_TAIL);
+ setUseEventMD5(BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isUseEventMd5AsId(), DEFAULT_USE_EVENT_MD5));
+ setGenEventMD5(BooleanUtils.toBooleanDefaultIfNull(getInputDescriptor().isGenEventMd5(), DEFAULT_GEN_EVENT_MD5));
+ }
+
+ @Override
+ public void start() throws Exception {
+ boolean isProcessFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getProcessFile(), true);
if (isProcessFile) {
for (int i = logFiles.length - 1; i >= 0; i--) {
File file = logFiles[i];
@@ -98,27 +227,33 @@ public class InputFile extends AbstractInputFile {
}
}
- @Override
- protected BufferedReader openLogFile(File logFile) throws FileNotFoundException {
+ public int getResumeFromLineNumber() {
+ return ResumeLineNumberHelper.getResumeFromLineNumber(this);
+ }
+
+ public void processFile(File logPathFile, boolean follow) throws Exception {
+ ProcessFileHelper.processFile(this, logPathFile, follow);
+ }
+
+ public BufferedReader openLogFile(File logFile) throws Exception {
BufferedReader br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logFile));
- fileKey = getFileKey(logFile);
+ fileKey = getFileKeyFromLogFile(logFile);
base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
return br;
}
- @Override
- protected Object getFileKey(File logFile) {
+ public Object getFileKeyFromLogFile(File logFile) {
return FileUtil.getFileKey(logFile);
}
private void copyFiles(File[] files) {
- boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)inputDescriptor).getCopyFile(), false);
+ boolean isCopyFile = BooleanUtils.toBooleanDefaultIfNull(((InputFileDescriptor)getInputDescriptor()).getCopyFile(), false);
if (isCopyFile && files != null) {
for (File file : files) {
try {
- InputMarker marker = new InputMarker(this, null, 0);
- outputManager.copyFile(file, marker);
+ InputFileMarker marker = new InputFileMarker(this, null, 0);
+ getOutputManager().copyFile(file, marker);
if (isClosed() || isDrain()) {
LOG.info("isClosed or isDrain. Now breaking loop.");
break;
@@ -129,4 +264,94 @@ public class InputFile extends AbstractInputFile {
}
}
}
+
+ @Override
+ public boolean isEnabled() {
+ return BooleanUtils.isNotFalse(getInputDescriptor().isEnabled());
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "input:source=" + getInputDescriptor().getSource() + ", path=" +
+ (!ArrayUtils.isEmpty(logFiles) ? logFiles[0].getAbsolutePath() : logPath);
+ }
+
+ @Override
+ public boolean logConfigs() {
+ LOG.info("Printing Input=" + getShortDescription());
+ LOG.info("description=" + getInputDescriptor().getPath());
+ return true;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ LOG.info("close() calling checkPoint checkIn(). " + getShortDescription());
+ lastCheckIn();
+ setClosed(true);
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String getLogPath() {
+ return logPath;
+ }
+
+ public Object getFileKey() {
+ return fileKey;
+ }
+
+ public String getBase64FileKey() throws Exception {
+ return base64FileKey;
+ }
+
+ public void setFileKey(Object fileKey) {
+ this.fileKey = fileKey;
+ }
+
+ public boolean isTail() {
+ return tail;
+ }
+
+ public File[] getLogFiles() {
+ return logFiles;
+ }
+
+ public void setBase64FileKey(String base64FileKey) {
+ this.base64FileKey = base64FileKey;
+ }
+
+ public void setLogFiles(File[] logFiles) {
+ this.logFiles = logFiles;
+ }
+
+ public String getCheckPointExtension() {
+ return checkPointExtension;
+ }
+
+ public int getCheckPointIntervalMS() {
+ return checkPointIntervalMS;
+ }
+
+ public Map<String, File> getCheckPointFiles() {
+ return checkPointFiles;
+ }
+
+ public Map<String, Long> getLastCheckPointTimeMSs() {
+ return lastCheckPointTimeMSs;
+ }
+
+ public Map<String, Map<String, Object>> getJsonCheckPoints() {
+ return jsonCheckPoints;
+ }
+
+ public Map<String, InputFileMarker> getLastCheckPointInputMarkers() {
+ return lastCheckPointInputMarkers;
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
similarity index 53%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
index 6767687..d3d12bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFileMarker.java
@@ -16,25 +16,45 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.input;
-/**
- * This file contains the file inode, line number of the log currently been read
- */
-public class InputMarker {
- public final Input input;
- public final String base64FileKey;
- public final int lineNumber;
-
- public InputMarker(Input input, String base64FileKey, int lineNumber) {
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class InputFileMarker implements InputMarker {
+
+ private final Input input;
+ private final String base64FileKey;
+ private final Integer lineNumber;
+
+ private final Map<String, Object> properties = new HashMap<>();
+
+ public InputFileMarker(Input input, String base64FileKey, Integer lineNumber) {
this.input = input;
this.base64FileKey = base64FileKey;
this.lineNumber = lineNumber;
+ properties.put("line_number", lineNumber);
+ properties.put("base64_file_key", base64FileKey);
}
-
+
@Override
- public String toString() {
- return "InputMarker [lineNumber=" + lineNumber + ", input=" + input.getShortDescription() + "]";
+ public Input getInput() {
+ return this.input;
+ }
+
+ @Override
+ public Map<String, Object> getAllProperties() {
+ return properties;
+ }
+
+ public String getBase64FileKey() {
+ return base64FileKey;
+ }
+
+ public int getLineNumber() {
+ return lineNumber;
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
similarity index 94%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
index d1f38ed..70aa681 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputManagerImpl.java
@@ -16,9 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.input;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+import javax.inject.Inject;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
@@ -33,23 +45,12 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.ambari.logfeeder.util.FileUtil;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.commons.io.filefilter.WildcardFileFilter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Logger;
-import org.apache.solr.common.util.Base64;
-
-import javax.inject.Inject;
+public class InputManagerImpl extends InputManager {
-public class InputManager {
- private static final Logger LOG = Logger.getLogger(InputManager.class);
+ private static final Logger LOG = Logger.getLogger(InputManagerImpl.class);
private static final String CHECKPOINT_SUBFOLDER_NAME = "logfeeder_checkpoints";
-
+
private Map<String, List<Input>> inputs = new HashMap<>();
private Set<Input> notReadyList = new HashSet<>();
@@ -59,7 +60,7 @@ public class InputManager {
private File checkPointFolderFile;
private MetricData filesCountMetric = new MetricData("input.files.count", true);
-
+
private Thread inputIsReadyMonitor;
@Inject
@@ -69,6 +70,7 @@ public class InputManager {
return inputs.get(serviceName);
}
+ @Override
public void add(String serviceName, Input input) {
List<Input> inputList = inputs.get(serviceName);
if (inputList == null) {
@@ -78,6 +80,7 @@ public class InputManager {
inputList.add(input);
}
+ @Override
public void removeInputsForService(String serviceName) {
List<Input> inputList = inputs.get(serviceName);
for (Input input : inputList) {
@@ -92,6 +95,7 @@ public class InputManager {
inputs.remove(serviceName);
}
+ @Override
public void removeInput(Input input) {
LOG.info("Trying to remove from inputList. " + input.getShortDescription());
for (List<Input> inputList : inputs.values()) {
@@ -118,11 +122,12 @@ public class InputManager {
return count;
}
- public void init() {
+ @Override
+ public void init() throws Exception {
initCheckPointSettings();
startMonitorThread();
}
-
+
private void initCheckPointSettings() {
checkPointExtension = logFeederProps.getCheckPointExtension();
LOG.info("Determining valid checkpoint folder");
@@ -133,7 +138,7 @@ public class InputManager {
checkPointFolderFile = new File(checkPointFolder);
isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
}
-
+
if (!isCheckPointFolderValid) {
// Let's use tmp folder
checkPointFolderFile = new File(logFeederProps.getTmpDir(), CHECKPOINT_SUBFOLDER_NAME);
@@ -141,10 +146,10 @@ public class InputManager {
isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
if (isCheckPointFolderValid) {
LOG.warn("Using tmp folder " + checkPointFolderFile + " to store check points. This is not recommended." +
- "Please set logfeeder.checkpoint.folder property");
+ "Please set logfeeder.checkpoint.folder property");
}
}
-
+
if (isCheckPointFolderValid) {
LOG.info("Using folder " + checkPointFolderFile + " for storing checkpoints");
} else {
@@ -182,10 +187,10 @@ public class InputManager {
}
}
};
-
+
inputIsReadyMonitor.start();
}
-
+
public void startInputs(String serviceName) {
for (Input input : inputs.get(serviceName)) {
try {
@@ -194,7 +199,7 @@ public class InputManager {
input.monitor();
} else {
LOG.info("Adding input to not ready list. Note, it is possible this component is not run on this host. " +
- "So it might not be an issue. " + input.getShortDescription());
+ "So it might not be an issue. " + input.getShortDescription());
notReadyList.add(input);
}
} catch (Exception e) {
@@ -231,10 +236,12 @@ public class InputManager {
return checkPointFolderFile;
}
- void addToNotReady(Input notReadyInput) {
+ @Override
+ public void addToNotReady(Input notReadyInput) {
notReadyList.add(notReadyInput);
}
+ @Override
public void addMetricsContainers(List<MetricData> metricsList) {
for (List<Input> inputList : inputs.values()) {
for (Input input : inputList) {
@@ -253,12 +260,11 @@ public class InputManager {
}
filesCountMetric.value = getActiveFilesCount();
- LogFeederUtil.logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
+ // TODO: logStatForMetric(filesCountMetric, "Stat: Files Monitored Count", "");
}
public void cleanCheckPointFiles() {
-
if (checkPointFolderFile == null) {
LOG.info("Will not clean checkPoint files. checkPointFolderFile=" + checkPointFolderFile);
return;
@@ -276,7 +282,7 @@ public class InputManager {
}
}
LOG.info("Deleted " + totalCheckFilesDeleted + " checkPoint file(s). checkPointFolderFile=" +
- checkPointFolderFile.getAbsolutePath());
+ checkPointFolderFile.getAbsolutePath());
} catch (Throwable t) {
LOG.error("Error while cleaning checkPointFiles", t);
@@ -306,12 +312,12 @@ public class InputManager {
String fileBase64 = Base64.byteArrayToBase64(fileKeyObj.toString().getBytes());
if (!logFileKey.equals(fileBase64)) {
LOG.info("CheckPoint clean: File key has changed. old=" + logFileKey + ", new=" + fileBase64 + ", filePath=" +
- logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
+ logFilePath + ", checkPointFile=" + checkPointFile.getAbsolutePath());
deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
}
} else {
LOG.info("CheckPoint clean: Log file doesn't exist. filePath=" + logFilePath + ", checkPointFile=" +
- checkPointFile.getAbsolutePath());
+ checkPointFile.getAbsolutePath());
deleteCheckPointFile = !wasFileRenamed(logFile.getParentFile(), logFileKey);
}
if (deleteCheckPointFile) {
@@ -326,10 +332,10 @@ public class InputManager {
} catch (Throwable t) {
LOG.error("Error while checking checkPoint file. " + checkPointFile, t);
}
-
+
return deleted;
}
-
+
private boolean wasFileRenamed(File folder, String searchFileBase64) {
for (File file : folder.listFiles()) {
Object fileKeyObj = FileUtil.getFileKey(file);
@@ -343,7 +349,7 @@ public class InputManager {
}
return false;
}
-
+
public void waitOnAllInputs() {
//wait on inputs
for (List<Input> inputList : inputs.values()) {
@@ -414,7 +420,7 @@ public class InputManager {
return;
}
}
-
+
LOG.warn("Some inputs were not closed after " + iterations + " iterations");
for (List<Input> inputList : inputs.values()) {
for (Input input : inputList) {
@@ -433,4 +439,6 @@ public class InputManager {
public LogFeederProps getLogFeederProps() {
return logFeederProps;
}
+
+
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 69d053a..41db8bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -18,34 +18,37 @@
*/
package org.apache.ambari.logfeeder.input;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-
import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputS3FileDescriptor;
import org.apache.commons.lang.ArrayUtils;
import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class InputS3File extends AbstractInputFile {
+import java.io.BufferedReader;
+import java.io.File;
+
+public class InputS3File extends InputFile {
+
+ private static final Logger LOG = LoggerFactory.getLogger(InputS3File.class);
@Override
public boolean isReady() {
- if (!isReady) {
+ if (!isReady()) {
// Let's try to check whether the file is available
- logFiles = getActualFiles(logPath);
- if (!ArrayUtils.isEmpty(logFiles)) {
- if (tail && logFiles.length > 1) {
- LOG.warn("Found multiple files (" + logFiles.length + ") for the file filter " + filePath +
- ". Will use only the first one. Using " + logFiles[0].getAbsolutePath());
+ setLogFiles(getActualFiles(getLogPath()));
+ if (!ArrayUtils.isEmpty(getLogFiles())) {
+ if (isTail() && getLogFiles().length > 1) {
+ LOG.warn("Found multiple files (" + getLogFiles().length + ") for the file filter " + getFilePath() +
+ ". Will use only the first one. Using " + getLogFiles()[0].getAbsolutePath());
}
- LOG.info("File filter " + filePath + " expanded to " + logFiles[0].getAbsolutePath());
- isReady = true;
+ LOG.info("File filter " + getFilePath() + " expanded to " + getLogFiles()[0].getAbsolutePath());
+ setReady(true);
} else {
- LOG.debug(logPath + " file doesn't exist. Ignoring for now");
+ LOG.debug(getLogPath() + " file doesn't exist. Ignoring for now");
}
}
- return isReady;
+ return isReady();
}
private File[] getActualFiles(String searchPath) {
@@ -54,14 +57,13 @@ public class InputS3File extends AbstractInputFile {
}
@Override
- void start() throws Exception {
- if (ArrayUtils.isEmpty(logFiles)) {
+ public void start() throws Exception {
+ if (ArrayUtils.isEmpty(getLogFiles())) {
return;
}
-
- for (int i = logFiles.length - 1; i >= 0; i--) {
- File file = logFiles[i];
- if (i == 0 || !tail) {
+ for (int i = getLogFiles().length - 1; i >= 0; i--) {
+ File file = getLogFiles()[i];
+ if (i == 0 || !isTail()) {
try {
processFile(file, i == 0);
if (isClosed() || isDrain()) {
@@ -77,24 +79,25 @@ public class InputS3File extends AbstractInputFile {
}
@Override
- protected BufferedReader openLogFile(File logPathFile) throws IOException {
- String s3AccessKey = ((InputS3FileDescriptor)inputDescriptor).getS3AccessKey();
- String s3SecretKey = ((InputS3FileDescriptor)inputDescriptor).getS3SecretKey();
+ public BufferedReader openLogFile(File logPathFile) throws Exception {
+ String s3AccessKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3AccessKey();
+ String s3SecretKey = ((InputS3FileDescriptor)getInputDescriptor()).getS3SecretKey();
BufferedReader br = S3Util.getReader(logPathFile.getPath(), s3AccessKey, s3SecretKey);
- fileKey = getFileKey(logPathFile);
- base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
+ Object fileKey = getFileKey(logPathFile);
+ setFileKey(fileKey);
+ String base64FileKey = Base64.byteArrayToBase64(getFileKey().toString().getBytes());
+ setBase64FileKey(base64FileKey);
LOG.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". " + getShortDescription());
return br;
}
- @Override
- protected Object getFileKey(File logFile) {
+ private Object getFileKey(File logFile) {
return logFile.getPath();
}
@Override
public void close() {
super.close();
- isClosed = true;
+ setClosed(true);
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 2b2d145..3ea3d90 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,21 @@
*/
package org.apache.ambari.logfeeder.input;
+import com.google.common.base.Joiner;
+import org.apache.ambari.logfeeder.conf.InputSimulateConfig;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.filter.FilterJSON;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
+import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
+import org.apache.commons.collections.MapUtils;
+import org.apache.solr.common.util.Base64;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
@@ -30,21 +45,8 @@ import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.ambari.logfeeder.conf.InputSimulateConfig;
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.filter.FilterJSON;
-import org.apache.ambari.logfeeder.output.Output;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
-import org.apache.commons.collections.MapUtils;
-import org.apache.solr.common.util.Base64;
-
-import com.google.common.base.Joiner;
-
-public class InputSimulate extends Input {
+public class InputSimulate extends InputFile {
+ private static final Logger LOG = LoggerFactory.getLogger(InputSimulate.class);
private static final String LOG_TEXT_PATTERN = "{ logtime=\"%d\", level=\"%s\", log_message=\"%s\", host=\"%s\"}";
private static final Map<String, String> typeToFilePath = new HashMap<>();
@@ -55,16 +57,16 @@ public class InputSimulate extends Input {
inputTypes.add(input.getType());
}
}
-
+
private static final Map<String, Integer> typeToLineNumber = new HashMap<>();
-
+
private static final AtomicInteger hostNumber = new AtomicInteger(0);
-
+
private static final List<Output> simulateOutputs = new ArrayList<>();
public static List<Output> getSimulateOutputs() {
return simulateOutputs;
}
-
+
private final Random random = new Random(System.currentTimeMillis());
private InputSimulateConfig conf;
@@ -123,20 +125,20 @@ public class InputSimulate extends Input {
}
@Override
- void start() throws Exception {
- getFirstFilter().setOutputManager(outputManager);
+ public void start() throws Exception {
+ getFirstFilter().setOutputManager(getOutputManager());
while (true) {
if (types.isEmpty()) {
try { Thread.sleep(sleepMillis); } catch(Exception e) { /* Ignore */ }
continue;
}
String type = imitateRandomLogFile();
-
+
String line = getLine();
- InputMarker marker = getInputMarker(type);
-
+ InputFileMarker marker = getInputMarker(type);
+
outputLine(line, marker);
-
+
try { Thread.sleep(sleepMillis); } catch(Exception e) { /* Ignore */ }
}
}
@@ -145,16 +147,15 @@ public class InputSimulate extends Input {
int typePos = random.nextInt(types.size());
String type = types.get(typePos);
String filePath = MapUtils.getString(typeToFilePath, type, "path of " + type);
-
- ((InputDescriptorImpl)inputDescriptor).setType(type);
+
+ ((InputDescriptorImpl)getInputDescriptor()).setType(type);
setFilePath(filePath);
-
+
return type;
}
- private InputMarker getInputMarker(String type) throws Exception {
- InputMarker marker = new InputMarker(this, getBase64FileKey(), getLineNumber(type));
- return marker;
+ private InputFileMarker getInputMarker(String type) throws Exception {
+ return new InputFileMarker(this, getBase64FileKey(), getLineNumber(type));
}
private static synchronized int getLineNumber(String type) {
@@ -162,13 +163,13 @@ public class InputSimulate extends Input {
typeToLineNumber.put(type, 0);
}
Integer lineNumber = typeToLineNumber.get(type) + 1;
-
+
typeToLineNumber.put(type, lineNumber);
return lineNumber;
}
- private String getBase64FileKey() throws Exception {
- String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + filePath;
+ public String getBase64FileKey() throws Exception {
+ String fileKey = InetAddress.getLocalHost().getHostAddress() + "|" + getFilePath();
return Base64.byteArrayToBase64(fileKey.getBytes());
}
@@ -177,7 +178,7 @@ public class InputSimulate extends Input {
String logMessage = createLogMessage();
return String.format(LOG_TEXT_PATTERN, d.getTime(), level, logMessage, host);
}
-
+
private String createLogMessage() {
int logMessageLength = minLogWords + random.nextInt(maxLogWords - minLogWords + 1);
Set<Integer> words = new TreeSet<>();
@@ -188,16 +189,16 @@ public class InputSimulate extends Input {
logMessage.add(String.format("Word%06d", word));
}
}
-
+
return Joiner.on(' ').join(logMessage);
}
@Override
- public void checkIn(InputMarker inputMarker) {}
+ public void checkIn(InputFileMarker inputMarker) {}
@Override
public void lastCheckIn() {}
-
+
@Override
public String getNameForThread() {
return "Simulated input";
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
new file mode 100644
index 0000000..9c607cf
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/FileCheckInHelper.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.util.FileUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Date;
+import java.util.Map;
+
+public class FileCheckInHelper {
+
+ private static final Logger LOG = Logger.getLogger(FileCheckInHelper.class);
+
+ private FileCheckInHelper() {
+ }
+
+ public static void checkIn(InputFile inputFile, InputFileMarker inputMarker) {
+ try {
+ Map<String, Object> jsonCheckPoint = inputFile.getJsonCheckPoints().get(inputMarker.getBase64FileKey());
+ File checkPointFile = inputFile.getCheckPointFiles().get(inputMarker.getBase64FileKey());
+
+ int lineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+ if (lineNumber > inputMarker.getLineNumber()) {
+ // Already wrote higher line number for this input
+ return;
+ }
+ // If interval is greater than last checkPoint time, then write
+ long currMS = System.currentTimeMillis();
+ long lastCheckPointTimeMs = inputFile.getLastCheckPointTimeMSs().containsKey(inputMarker.getBase64FileKey()) ?
+ inputFile.getLastCheckPointTimeMSs().get(inputMarker.getBase64FileKey()) : 0;
+ if (!inputFile.isClosed() && (currMS - lastCheckPointTimeMs < inputFile.getCheckPointIntervalMS())) {
+ // Let's save this one so we can update the check point file on flush
+ inputFile.getLastCheckPointInputMarkers().put(inputMarker.getBase64FileKey(), inputMarker);
+ return;
+ }
+ inputFile.getLastCheckPointTimeMSs().put(inputMarker.getBase64FileKey(), currMS);
+
+ jsonCheckPoint.put("line_number", "" + new Integer(inputMarker.getLineNumber()));
+ jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+ jsonCheckPoint.put("last_write_time_date", new Date());
+
+ String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+ File tmpCheckPointFile = new File(checkPointFile.getAbsolutePath() + ".tmp");
+ if (tmpCheckPointFile.exists()) {
+ tmpCheckPointFile.delete();
+ }
+ RandomAccessFile tmpRaf = new RandomAccessFile(tmpCheckPointFile, "rws");
+ tmpRaf.writeInt(jsonStr.length());
+ tmpRaf.write(jsonStr.getBytes());
+ tmpRaf.getFD().sync();
+ tmpRaf.close();
+
+ FileUtil.move(tmpCheckPointFile, checkPointFile);
+
+ if (inputFile.isClosed()) {
+ String logMessageKey = inputFile.getClass().getSimpleName() + "_FINAL_CHECKIN";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Wrote final checkPoint, input=" + inputFile.getShortDescription() +
+ ", checkPointFile=" + checkPointFile.getAbsolutePath() + ", checkPoint=" + jsonStr, null, LOG, Level.INFO);
+ }
+ } catch (Throwable t) {
+ String logMessageKey = inputFile.getClass().getSimpleName() + "_CHECKIN_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception checkIn. , input=" + inputFile.getShortDescription(), t,
+ LOG, Level.ERROR);
+ }
+ }
+
+
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java
new file mode 100644
index 0000000..4ed415a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ProcessFileHelper.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import java.io.BufferedReader;
+import java.io.File;
+
+public class ProcessFileHelper {
+
+ private static final Logger LOG = Logger.getLogger(ProcessFileHelper.class);
+
+ private ProcessFileHelper() {
+ }
+
+ public static void processFile(InputFile inputFile, File logPathFile, boolean follow) throws Exception {
+ LOG.info("Monitoring logPath=" + inputFile.getLogPath() + ", logPathFile=" + logPathFile);
+ BufferedReader br = null;
+
+ int lineCount = 0;
+ try {
+ inputFile.setFilePath(logPathFile.getAbsolutePath());
+
+ br = inputFile.openLogFile(logPathFile);
+
+ boolean resume = true;
+ int resumeFromLineNumber = inputFile.getResumeFromLineNumber();
+ if (resumeFromLineNumber > 0) {
+ LOG.info("Resuming log file " + logPathFile.getAbsolutePath() + " from line number " + resumeFromLineNumber);
+ resume = false;
+ }
+
+ inputFile.setClosed(false);
+ int sleepStep = 2;
+ int sleepIteration = 0;
+ while (true) {
+ try {
+ if (inputFile.isDrain()) {
+ break;
+ }
+
+ String line = br.readLine();
+ if (line == null) {
+ if (!resume) {
+ resume = true;
+ }
+ sleepIteration++;
+ if (sleepIteration == 2) {
+ inputFile.flush();
+ if (!follow) {
+ LOG.info("End of file. Done with filePath=" + logPathFile.getAbsolutePath() + ", lineCount=" + lineCount);
+ break;
+ }
+ } else if (sleepIteration > 4) {
+ Object newFileKey = inputFile.getFileKeyFromLogFile(logPathFile);
+ if (newFileKey != null && (inputFile.getFileKey() == null || !newFileKey.equals(inputFile.getFileKey()))) {
+ LOG.info("File key is different. Marking this input file for rollover. oldKey=" + inputFile.getFileKey() + ", newKey=" +
+ newFileKey + ". " + inputFile.getShortDescription());
+
+ try {
+ LOG.info("File is rolled over. Closing current open file." + inputFile.getShortDescription() + ", lineCount=" +
+ lineCount);
+ br.close();
+ } catch (Exception ex) {
+ LOG.error("Error closing file" + inputFile.getShortDescription(), ex);
+ break;
+ }
+
+ try {
+ LOG.info("Opening new rolled over file." + inputFile.getShortDescription());
+ br = inputFile.openLogFile(logPathFile);
+ lineCount = 0;
+ } catch (Exception ex) {
+ LOG.error("Error opening rolled over file. " + inputFile.getShortDescription(), ex);
+ LOG.info("Added input to not ready list." + inputFile.getShortDescription());
+ inputFile.setReady(false);
+ inputFile.getInputManager().addToNotReady(inputFile);
+ break;
+ }
+ LOG.info("File is successfully rolled over. " + inputFile.getShortDescription());
+ continue;
+ }
+ }
+ try {
+ Thread.sleep(sleepStep * 1000);
+ sleepStep = Math.min(sleepStep * 2, 10);
+ } catch (InterruptedException e) {
+ LOG.info("Thread interrupted." + inputFile.getShortDescription());
+ }
+ } else {
+ lineCount++;
+ sleepStep = 1;
+ sleepIteration = 0;
+
+ if (!resume && lineCount > resumeFromLineNumber) {
+ LOG.info("Resuming to read from last line. lineCount=" + lineCount + ", input=" + inputFile.getShortDescription());
+ resume = true;
+ }
+ if (resume) {
+ InputFileMarker marker = new InputFileMarker(inputFile, inputFile.getBase64FileKey(), lineCount);
+ inputFile.outputLine(line, marker);
+ }
+ }
+ } catch (Throwable t) {
+ String logMessageKey = inputFile.getClass().getSimpleName() + "_READ_LOOP_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in read loop. lineNumber=" + lineCount +
+ ", input=" + inputFile.getShortDescription(), t, LOG, Level.ERROR);
+ }
+ }
+ } finally {
+ if (br != null) {
+ LOG.info("Closing reader." + inputFile.getShortDescription() + ", lineCount=" + lineCount);
+ try {
+ br.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
new file mode 100644
index 0000000..9350200
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/file/ResumeLineNumberHelper.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.file;
+
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.HashMap;
+import java.util.Map;
+
+public class ResumeLineNumberHelper {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ResumeLineNumberHelper.class);
+
+ private ResumeLineNumberHelper() {
+ }
+
+ public static int getResumeFromLineNumber(InputFile inputFile) {
+ int resumeFromLineNumber = 0;
+
+ File checkPointFile = null;
+ try {
+ LOG.info("Checking existing checkpoint file. " + inputFile.getShortDescription());
+
+ String checkPointFileName = inputFile.getBase64FileKey() + inputFile.getCheckPointExtension();
+ File checkPointFolder = inputFile.getInputManager().getCheckPointFolderFile();
+ checkPointFile = new File(checkPointFolder, checkPointFileName);
+ inputFile.getCheckPointFiles().put(inputFile.getBase64FileKey(), checkPointFile);
+ Map<String, Object> jsonCheckPoint = null;
+ if (!checkPointFile.exists()) {
+ LOG.info("Checkpoint file for log file " + inputFile.getFilePath() + " doesn't exist, starting to read it from the beginning");
+ } else {
+ try (RandomAccessFile checkPointWriter = new RandomAccessFile(checkPointFile, "rw")) {
+ int contentSize = checkPointWriter.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointWriter.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ LOG.error("Couldn't read expected number of bytes from checkpoint file. expected=" + contentSize + ", read=" +
+ readSize + ", checkPointFile=" + checkPointFile + ", input=" + inputFile.getShortDescription());
+ } else {
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
+
+ resumeFromLineNumber = LogFeederUtil.objectToInt(jsonCheckPoint.get("line_number"), 0, "line_number");
+
+ LOG.info("CheckPoint. checkPointFile=" + checkPointFile + ", json=" + jsonCheckPointStr +
+ ", resumeFromLineNumber=" + resumeFromLineNumber);
+ }
+ } catch (EOFException eofEx) {
+ LOG.info("EOFException. Will reset checkpoint file " + checkPointFile.getAbsolutePath() + " for " +
+ inputFile.getShortDescription(), eofEx);
+ }
+ }
+ if (jsonCheckPoint == null) {
+ // This seems to be first time, so creating the initial checkPoint object
+ jsonCheckPoint = new HashMap<String, Object>();
+ jsonCheckPoint.put("file_path", inputFile.getFilePath());
+ jsonCheckPoint.put("file_key", inputFile.getBase64FileKey());
+ }
+
+ inputFile.getJsonCheckPoints().put(inputFile.getBase64FileKey(), jsonCheckPoint);
+
+ } catch (Throwable t) {
+ LOG.error("Error while configuring checkpoint file. Will reset file. checkPointFile=" + checkPointFile, t);
+ }
+
+ return resumeFromLineNumber;
+ }
+
+}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
index 9ccc4f2..7f78fd1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -18,6 +18,8 @@
*/
package org.apache.ambari.logfeeder.input.reader;
+import org.apache.log4j.Logger;
+
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -25,8 +27,6 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
-import org.apache.log4j.Logger;
-
class GZIPReader extends InputStreamReader {
private static final Logger LOG = Logger.getLogger(GZIPReader.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
index 5fc2e14..b9393aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -18,13 +18,13 @@
*/
package org.apache.ambari.logfeeder.input.reader;
+import org.apache.log4j.Logger;
+
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.Reader;
-import org.apache.log4j.Logger;
-
public enum LogsearchReaderFactory {
INSTANCE;
private static final Logger LOG = Logger.getLogger(LogsearchReaderFactory.class);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
index 83c293b..1874ee9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/loglevelfilter/LogLevelFilterHandler.java
@@ -16,20 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.loglevelfilter;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TimeZone;
-
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.LogLevelFilterMonitor;
import org.apache.ambari.logsearch.config.api.LogSearchConfig;
@@ -42,6 +33,13 @@ import org.slf4j.LoggerFactory;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
public class LogLevelFilterHandler implements LogLevelFilterMonitor {
private static final Logger LOG = LoggerFactory.getLogger(LogLevelFilterHandler.class);
@@ -50,7 +48,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
private static final boolean DEFAULT_VALUE = true;
-
+
private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
protected DateFormat initialValue() {
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
@@ -92,7 +90,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
if (!logFeederProps.isLogLevelFilterEnabled()) {
return true;
}
-
+
LogLevelFilter logFilter = findLogFilter(logId);
List<String> allowedLevels = getAllowedLevels(hostName, logFilter);
return allowedLevels.isEmpty() || allowedLevels.contains(level);
@@ -107,7 +105,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
}
public boolean isAllowed(Map<String, Object> jsonObj, InputMarker inputMarker) {
- if ("audit".equals(inputMarker.input.getInputDescriptor().getRowtype()))
+ if ("audit".equals(inputMarker.getInput().getInputDescriptor().getRowtype()))
return true;
boolean isAllowed = applyFilter(jsonObj);
@@ -171,7 +169,7 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
if (hosts.isEmpty() || hosts.contains(hostName)) {
if (isFilterExpired(componentFilter)) {
LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
- componentFilter.getExpiryTime());
+ componentFilter.getExpiryTime());
return defaultLevels;
} else {
return overrideLevels;
@@ -193,8 +191,8 @@ public class LogLevelFilterHandler implements LogLevelFilterMonitor {
Date currentDate = new Date();
if (!currentDate.before(filterEndDate)) {
LOG.debug("Filter for Component :" + logLevelFilter.getLabel() + " and Hosts : [" +
- StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
- formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
+ StringUtils.join(logLevelFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
+ formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
return true;
} else {
return false;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
index c85ad49..8c0fc72 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperAnonymize.java
@@ -19,8 +19,9 @@
package org.apache.ambari.logfeeder.mapper;
-import java.util.Map;
-
+import com.google.common.base.Splitter;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapAnonymizeDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
@@ -29,9 +30,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.common.base.Splitter;
+import java.util.Map;
-public class MapperAnonymize extends Mapper {
+public class MapperAnonymize extends Mapper<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(MapperAnonymize.class);
private static final char DEFAULT_HIDE_CHAR = '*';
@@ -115,6 +116,6 @@ public class MapperAnonymize extends Mapper {
sb.append(rest);
- jsonObj.put(fieldName, sb.toString());
+ jsonObj.put(getFieldName(), sb.toString());
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index e099161..150869b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -19,27 +19,29 @@
package org.apache.ambari.logfeeder.mapper;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Map;
-
import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapDateDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-public class MapperDate extends Mapper {
+import java.text.ParseException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.Map;
+
+public class MapperDate extends Mapper<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(MapperDate.class);
- private SimpleDateFormat targetDateFormatter = null;
+ private FastDateFormat targetDateFormatter = null;
private boolean isEpoch = false;
- private SimpleDateFormat srcDateFormatter = null;
+ private FastDateFormat srcDateFormatter=null;
@Override
public boolean init(String inputDesc, String fieldName, String mapClassCode, MapFieldDescriptor mapFieldDescriptor) {
@@ -57,9 +59,9 @@ public class MapperDate extends Mapper {
return true;
} else {
try {
- targetDateFormatter = new SimpleDateFormat(targetDateFormat);
+ targetDateFormatter = FastDateFormat.getInstance(targetDateFormat);
if (!StringUtils.isEmpty(srcDateFormat)) {
- srcDateFormatter = new SimpleDateFormat(srcDateFormat);
+ srcDateFormatter = FastDateFormat.getInstance(srcDateFormat);
}
return true;
} catch (Throwable ex) {
@@ -90,10 +92,10 @@ public class MapperDate extends Mapper {
} else {
return value;
}
- jsonObj.put(fieldName, value);
+ jsonObj.put(getFieldName(), value);
} catch (Throwable t) {
LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying date transformation." +
- " isEpoch=" + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
+ " isEpoch=" + isEpoch + ", targetDateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.getPattern():"")
+ ", value=" + value + ". " + this.toString(), t, LOG, Level.ERROR);
}
}
@@ -105,7 +107,7 @@ public class MapperDate extends Mapper {
Calendar currentCalendar = Calendar.getInstance();
- if (!srcDateFormatter.toPattern().contains("dd")) {
+ if (!srcDateFormatter.getPattern().contains("dd")) {
//set year/month/date in src_date when src_date does not have date component
srcDate = DateUtils.setYears(srcDate, currentCalendar.get(Calendar.YEAR));
srcDate = DateUtils.setMonths(srcDate, currentCalendar.get(Calendar.MONTH));
@@ -114,7 +116,7 @@ public class MapperDate extends Mapper {
if (srcDate.getTime() > currentCalendar.getTimeInMillis()) {
srcDate = DateUtils.addDays(srcDate, -1);
}
- } else if (!srcDateFormatter.toPattern().contains("yy")) {
+ } else if (!srcDateFormatter.getPattern().contains("yy")) {
//set year in src_date when src_date does not have year component
srcDate = DateUtils.setYears(srcDate, currentCalendar.get(Calendar.YEAR));
// if with the current year the time stamp is after the current one, it must be previous year
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
index a463f49..bbb6337 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldCopy.java
@@ -19,17 +19,19 @@
package org.apache.ambari.logfeeder.mapper;
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldCopyDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
+import java.util.Map;
+
/**
* Overrides the value for the field
*/
-public class MapperFieldCopy extends Mapper {
+public class MapperFieldCopy extends Mapper<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(MapperFieldCopy.class);
private String copyName = null;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 3f160da..2b1f70f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -19,8 +19,8 @@
package org.apache.ambari.logfeeder.mapper;
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldNameDescriptor;
@@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import java.util.Map;
+
/**
* Overrides the value for the field
*/
-public class MapperFieldName extends Mapper {
+public class MapperFieldName extends Mapper<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(MapperFieldName.class);
private String newValue = null;
@@ -51,7 +53,7 @@ public class MapperFieldName extends Mapper {
@Override
public Object apply(Map<String, Object> jsonObj, Object value) {
if (newValue != null) {
- jsonObj.remove(fieldName);
+ jsonObj.remove(getFieldName());
jsonObj.put(newValue, value);
} else {
LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 03ff95b..e3d4924 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -19,8 +19,8 @@
package org.apache.ambari.logfeeder.mapper;
-import java.util.Map;
-
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.filter.mapper.Mapper;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldDescriptor;
import org.apache.ambari.logsearch.config.api.model.inputconfig.MapFieldValueDescriptor;
@@ -28,10 +28,12 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import java.util.Map;
+
/**
* Overrides the value for the field
*/
-public class MapperFieldValue extends Mapper {
+public class MapperFieldValue extends Mapper<LogFeederProps> {
private static final Logger LOG = Logger.getLogger(MapperFieldValue.class);
private String prevValue = null;
@@ -55,7 +57,7 @@ public class MapperFieldValue extends Mapper {
if (newValue != null && prevValue != null) {
if (prevValue.equalsIgnoreCase(value.toString())) {
value = newValue;
- jsonObj.put(fieldName, value);
+ jsonObj.put(getFieldName(), value);
}
} else {
LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
index 96084c1..f5bc0eb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -26,6 +26,7 @@ import java.util.TreeMap;
import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig;
import org.apache.ambari.logfeeder.conf.MetricsCollectorConfig;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
index 1dd9287..91de1d8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/StatsLogger.java
@@ -19,6 +19,7 @@
package org.apache.ambari.logfeeder.metrics;
import org.apache.ambari.logfeeder.common.ConfigHandler;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
index c46086e..5cde6db 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -19,9 +19,9 @@
package org.apache.ambari.logfeeder.output;
-import java.util.Map;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import java.util.Map;
/**
* This contains the output json object and InputMarker.
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index fa4e17b..dafe6a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -18,15 +18,18 @@
*/
package org.apache.ambari.logfeeder.output;
-import java.io.File;
-
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.conf.LogFeederProps;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
import org.apache.log4j.Logger;
+import java.io.File;
+
/**
* Output that just ignore the logs
*/
-public class OutputDevNull extends Output {
+public class OutputDevNull extends Output<LogFeederProps, InputMarker> {
private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
@@ -36,7 +39,44 @@ public class OutputDevNull extends Output {
}
@Override
+ public Long getPendingCount() {
+ return 0L;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return "write:devnull";
+ }
+
+ @Override
+ public String getOutputType() {
+ return "devnull";
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
public void copyFile(File inputFile, InputMarker inputMarker) {
throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
}
+
+ @Override
+ public void init(LogFeederProps LogFeederProps) throws Exception {
+ }
+
+ @Override
+ public String getShortDescription() {
+ return "write:devnull";
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return "write:devnull";
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 4576deb..e70d769 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -19,32 +19,35 @@
package org.apache.ambari.logfeeder.output;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.Map;
-
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
-public class OutputFile extends Output {
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
+public class OutputFile extends Output<LogFeederProps, InputFileMarker> {
private static final Logger LOG = Logger.getLogger(OutputFile.class);
private PrintWriter outWriter;
private String filePath = null;
private String codec;
+ private LogFeederProps logFeederProps;
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
-
+ this.logFeederProps = logFeederProps;
filePath = getStringValue("path");
if (StringUtils.isEmpty(filePath)) {
LOG.error("Filepath config property <path> is not set in config file.");
@@ -87,11 +90,11 @@ public class OutputFile extends Output {
// Ignore this exception
}
}
- isClosed = true;
+ setClosed(true);
}
@Override
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
+ public void write(Map<String, Object> jsonObj, InputFileMarker inputMarker) throws Exception {
String outStr = null;
CSVPrinter csvPrinter = null;
try {
@@ -118,7 +121,7 @@ public class OutputFile extends Output {
}
@Override
- synchronized public void write(String block, InputMarker inputMarker) throws Exception {
+ synchronized public void write(String block, InputFileMarker inputMarker) throws Exception {
if (outWriter != null && block != null) {
statMetric.value++;
@@ -128,11 +131,35 @@ public class OutputFile extends Output {
}
@Override
+ public Long getPendingCount() {
+ return null;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return "output.kafka.write_bytes";
+ }
+
+ @Override
public String getShortDescription() {
return "output:destination=file,path=" + filePath;
}
@Override
+ public String getStatMetricName() {
+ return "output.file.write_logs";
+ }
+
+ @Override
+ public String getOutputType() {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ }
+
+ @Override
public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("copyFile method is not yet supported for output=file");
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index ed66eb0..f2faf64 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -20,14 +20,17 @@
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederHDFSUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.Logger;
@@ -43,7 +46,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*
* The events are spooled on the local file system and uploaded in batches asynchronously.
*/
-public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
+public class OutputHDFSFile extends Output<LogFeederProps, InputFileMarker> implements RolloverHandler, RolloverCondition {
private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class);
private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
@@ -64,9 +67,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
private LogSpooler logSpooler;
+ private LogFeederProps logFeederProps;
+
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
+ this.logFeederProps = logFeederProps;
hdfsOutDir = getStringValue("hdfs_out_dir");
hdfsHost = getStringValue("hdfs_host");
hdfsPort = getStringValue("hdfs_port");
@@ -98,11 +103,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
LOG.info("Closing file." + getShortDescription());
logSpooler.rollover();
this.stopHDFSCopyThread();
- isClosed = true;
+ setClosed(true);
}
@Override
- public synchronized void write(String block, InputMarker inputMarker) throws Exception {
+ public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
if (block != null) {
logSpooler.add(block);
statMetric.value++;
@@ -234,4 +239,28 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
}
return shouldRollover;
}
+
+ @Override
+ public String getOutputType() {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ }
+
+ @Override
+ public Long getPendingCount() {
+ return 0L;
+ }
+
+ @Override
+ public String getWriteBytesMetricName() {
+ return "output.hdfs.write_bytes";
+ }
+
+ @Override
+ public String getStatMetricName() {
+ return "output.hdfs.write_logs";
+ }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index 5c8ec82..7539484 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,15 +19,12 @@
package org.apache.ambari.logfeeder.output;
-import java.io.File;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.LinkedTransferQueue;
-
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,7 +34,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-public class OutputKafka extends Output {
+import java.io.File;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedTransferQueue;
+
+public class OutputKafka extends Output<LogFeederProps, InputFileMarker> {
private static final Logger LOG = Logger.getLogger(OutputKafka.class);
private static final int FAILED_RETRY_INTERVAL = 30;
@@ -56,19 +59,21 @@ public class OutputKafka extends Output {
// Let's start with the assumption Kafka is down
private boolean isKafkaBrokerUp = false;
+ private LogFeederProps logFeederProps;
+
@Override
- protected String getStatMetricName() {
+ public String getStatMetricName() {
return "output.kafka.write_logs";
}
@Override
- protected String getWriteBytesMetricName() {
+ public String getWriteBytesMetricName() {
return "output.kafka.write_bytes";
}
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
+ this.logFeederProps = logFeederProps;
Properties props = initProperties();
producer = creteKafkaProducer(props);
@@ -99,9 +104,9 @@ public class OutputKafka extends Output {
props.put("batch.size", batchSize);
props.put("linger.ms", lingerMS);
- for (String key : configs.keySet()) {
+ for (String key : getConfigs().keySet()) {
if (key.startsWith("kafka.")) {
- Object value = configs.get(key);
+ Object value = getConfigs().get(key);
if (value == null || value.toString().length() == 0) {
continue;
}
@@ -151,15 +156,15 @@ public class OutputKafka extends Output {
}
@Override
- public synchronized void write(String block, InputMarker inputMarker) throws Exception {
- while (!isDrain() && !inputMarker.input.isDrain()) {
+ public synchronized void write(String block, InputFileMarker inputMarker) throws Exception {
+ while (!isDrain() && !inputMarker.getInput().isDrain()) {
try {
if (failedMessages.size() == 0) {
if (publishMessage(block, inputMarker)) {
break;
}
}
- if (isDrain() || inputMarker.input.isDrain()) {
+ if (isDrain() || inputMarker.getInput().isDrain()) {
break;
}
if (!isKafkaBrokerUp) {
@@ -281,4 +286,19 @@ public class OutputKafka extends Output {
public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
}
+
+ @Override
+ public String getOutputType() {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ }
+
+ @Override
+ public void outputConfigChanged(OutputProperties outputProperties) {
+ throw new IllegalStateException("This method should be overriden if the Output wants to monitor the configuration");
+ };
+
+ @Override
+ public Long getPendingCount() {
+ return 0L;
+ }
}
\ No newline at end of file
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
index 8308a4f..04600a3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputLineFilter.java
@@ -19,8 +19,8 @@
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
similarity index 83%
rename from ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
rename to ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
index f5c4176..bebf225 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManagerImpl.java
@@ -16,23 +16,18 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.output;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.MurmurHash;
import org.apache.ambari.logsearch.config.api.OutputConfigMonitor;
@@ -41,9 +36,15 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import javax.inject.Inject;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
-public class OutputManager {
- private static final Logger LOG = Logger.getLogger(OutputManager.class);
+public class OutputManagerImpl extends OutputManager {
+ private static final Logger LOG = Logger.getLogger(OutputManagerImpl.class);
private static final int HASH_SEED = 31174077;
private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
@@ -81,6 +82,7 @@ public class OutputManager {
this.outputs.add(output);
}
+ @Override
public void init() throws Exception {
for (Output output : outputs) {
output.init(logFeederProps);
@@ -88,7 +90,7 @@ public class OutputManager {
}
public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
- Input input = inputMarker.input;
+ Input input = inputMarker.getInput();
// Update the block with the context fields
for (Map.Entry<String, String> entry : input.getInputDescriptor().getAddFields().entrySet()) {
@@ -102,8 +104,11 @@ public class OutputManager {
if (jsonObj.get("type") == null) {
jsonObj.put("type", input.getInputDescriptor().getType());
}
- if (jsonObj.get("path") == null && input.getFilePath() != null) {
- jsonObj.put("path", input.getFilePath());
+ if (input.getClass().isAssignableFrom(InputFile.class)) { // TODO: find better solution
+ InputFile inputFile = (InputFile) input;
+ if (jsonObj.get("path") == null && inputFile.getFilePath() != null) {
+ jsonObj.put("path", inputFile.getFilePath());
+ }
}
if (jsonObj.get("path") == null && input.getInputDescriptor().getPath() != null) {
jsonObj.put("path", input.getInputDescriptor().getPath());
@@ -114,10 +119,8 @@ public class OutputManager {
if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
jsonObj.put("ip", LogFeederUtil.ipAddress);
}
- if (jsonObj.get("level") == null) {
- jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
- }
-
+ jsonObj.putIfAbsent("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+
if (input.isUseEventMD5() || input.isGenEventMD5()) {
String prefix = "";
Object logtimeObj = jsonObj.get("logtime");
@@ -128,7 +131,7 @@ public class OutputManager {
prefix = logtimeObj.toString();
}
}
-
+
Long eventMD5 = MurmurHash.hash64A(LogFeederUtil.getGson().toJson(jsonObj).getBytes(), HASH_SEED);
if (input.isGenEventMD5()) {
jsonObj.put("event_md5", prefix + eventMD5.toString());
@@ -145,8 +148,9 @@ public class OutputManager {
if (jsonObj.get("event_count") == null) {
jsonObj.put("event_count", new Integer(1));
}
- if (inputMarker.lineNumber > 0) {
- jsonObj.put("logfile_line_number", new Integer(inputMarker.lineNumber));
+ if (inputMarker.getAllProperties().containsKey("line_number") &&
+ (Integer) inputMarker.getAllProperties().get("line_number") > 0) {
+ jsonObj.put("logfile_line_number", inputMarker.getAllProperties().get("line_number"));
}
if (jsonObj.containsKey("log_message")) {
// TODO: Let's check size only for log_message for now
@@ -157,8 +161,9 @@ public class OutputManager {
}
}
if (logLevelFilterHandler.isAllowed(jsonObj, inputMarker)
- && !outputLineFilter.apply(jsonObj, inputMarker.input)) {
- for (Output output : input.getOutputList()) {
+ && !outputLineFilter.apply(jsonObj, inputMarker.getInput())) {
+ List<? extends Output> outputList = input.getOutputList();
+ for (Output output : outputList) {
try {
output.write(jsonObj, inputMarker);
} catch (Exception e) {
@@ -174,8 +179,8 @@ public class OutputManager {
messageTruncateMetric.value++;
String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
- ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
- StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
+ ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
+ StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
jsonObj.put("log_message", logMessage);
List<String> tagsList = (List<String>) jsonObj.get("tags");
@@ -190,7 +195,8 @@ public class OutputManager {
public void write(String jsonBlock, InputMarker inputMarker) {
if (logLevelFilterHandler.isAllowed(jsonBlock, inputMarker)) {
- for (Output output : inputMarker.input.getOutputList()) {
+ List<? extends Output> outputList = inputMarker.getInput().getOutputList();
+ for (Output output : outputList) {
try {
output.write(jsonBlock, inputMarker);
} catch (Exception e) {
@@ -201,8 +207,9 @@ public class OutputManager {
}
public void copyFile(File inputFile, InputMarker inputMarker) {
- Input input = inputMarker.input;
- for (Output output : input.getOutputList()) {
+ Input input = inputMarker.getInput();
+ List<? extends Output> outputList = input.getOutputList();
+ for (Output output : outputList) {
try {
output.copyFile(inputFile, inputMarker);
}catch (Exception e) {
@@ -235,7 +242,7 @@ public class OutputManager {
// Ignore
}
}
-
+
// Need to get this value from property
int iterations = 30;
int waitTimeMS = 1000;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index d8eed2b..bae7eec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -21,12 +21,14 @@ package org.apache.ambari.logfeeder.output;
import com.google.common.annotations.VisibleForTesting;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterDescriptor;
@@ -52,7 +54,7 @@ import java.util.Map;
* <li>A batch mode, asynchronous, periodic upload of files</li>
* </ul>
*/
-public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+public class OutputS3File extends OutputFile implements RolloverCondition, RolloverHandler {
private static final Logger LOG = Logger.getLogger(OutputS3File.class);
public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
@@ -60,10 +62,11 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
private LogSpooler logSpooler;
private S3OutputConfiguration s3OutputConfiguration;
private S3Uploader s3Uploader;
+ private LogFeederProps logFeederProps;
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
+ this.logFeederProps = logFeederProps;
s3OutputConfiguration = S3OutputConfiguration.fromConfigBlock(this);
}
@@ -76,9 +79,9 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
*/
@Override
public void copyFile(File inputFile, InputMarker inputMarker) {
- String type = inputMarker.input.getInputDescriptor().getType();
+ String type = inputMarker.getInput().getInputDescriptor().getType();
S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
- String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getInputDescriptor().getType());
+ String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.getInput().getInputDescriptor().getType());
uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
}
@@ -87,8 +90,8 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
String resolvedPath) {
ArrayList<FilterDescriptor> filters = new ArrayList<>();
- addFilters(filters, inputMarker.input.getFirstFilter());
- InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.input.getInputDescriptor();
+ addFilters(filters, inputMarker.getInput().getFirstFilter());
+ InputS3FileDescriptor inputS3FileDescriptorOriginal = (InputS3FileDescriptor) inputMarker.getInput().getInputDescriptor();
InputS3FileDescriptorImpl inputS3FileDescriptor = InputConfigGson.gson.fromJson(
InputConfigGson.gson.toJson(inputS3FileDescriptorOriginal), InputS3FileDescriptorImpl.class);
String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
@@ -189,12 +192,17 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
* @throws Exception
*/
@Override
- public void write(String block, InputMarker inputMarker) throws Exception {
+ public void write(String block, InputFileMarker inputMarker) throws Exception {
if (logSpooler == null) {
- logSpooler = createSpooler(inputMarker.input.getFilePath());
- s3Uploader = createUploader(inputMarker.input.getInputDescriptor().getType());
+ if (inputMarker.getInput().getClass().isAssignableFrom(InputFile.class)) {
+ InputFile input = (InputFile) inputMarker.getInput();
+ logSpooler = createSpooler(input.getFilePath());
+ s3Uploader = createUploader(input.getInputDescriptor().getType());
+ logSpooler.add(block);
+ } else {
+ LOG.error("Cannot write from non local file...");
+ }
}
- logSpooler.add(block);
}
@VisibleForTesting
@@ -206,7 +214,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
@VisibleForTesting
protected LogSpooler createSpooler(String filePath) {
- String spoolDirectory = getLogFeederProps().getTmpDir() + "/s3/service";
+ String spoolDirectory = logFeederProps.getTmpDir() + "/s3/service";
LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
s3OutputConfiguration.getRolloverTimeThresholdSecs());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index cdb869a..9816c15 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -19,24 +19,9 @@
package org.apache.ambari.logfeeder.output;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputProperties;
@@ -56,7 +41,23 @@ import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.CollectionStateWatcher;
import org.apache.solr.common.cloud.DocCollection;
-public class OutputSolr extends Output implements CollectionStateWatcher {
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class OutputSolr extends Output<LogFeederProps, InputMarker> implements CollectionStateWatcher {
private static final Logger LOG = Logger.getLogger(OutputSolr.class);
@@ -88,6 +89,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
private BlockingQueue<OutputData> outgoingBuffer = null;
private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
+ private LogFeederProps logFeederProps;
+
@Override
public boolean monitorConfigChanges() {
return true;
@@ -99,18 +102,18 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
}
@Override
- protected String getStatMetricName() {
+ public String getStatMetricName() {
return "output.solr.write_logs";
}
@Override
- protected String getWriteBytesMetricName() {
+ public String getWriteBytesMetricName() {
return "output.solr.write_bytes";
}
@Override
public void init(LogFeederProps logFeederProps) throws Exception {
- super.init(logFeederProps);
+ this.logFeederProps = logFeederProps;
initParams();
setupSecurity();
createOutgoingBuffer();
@@ -121,7 +124,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
private void initParams() throws Exception {
type = getStringValue("type");
while (true) {
- OutputSolrProperties outputSolrProperties = logSearchConfig.getOutputSolrProperties(type);
+ OutputSolrProperties outputSolrProperties = getLogSearchConfig().getOutputSolrProperties(type);
if (outputSolrProperties == null) {
LOG.info("Output solr properties for type " + type + " is not available yet.");
try { Thread.sleep(OUTPUT_PROPERTIES_WAIT_MS); } catch (Exception e) { LOG.warn(e); }
@@ -175,8 +178,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
}
private void setupSecurity() {
- String jaasFile = getLogFeederProps().getLogFeederSecurityConfig().getSolrJaasFile();
- boolean securityEnabled = getLogFeederProps().getLogFeederSecurityConfig().isSolrKerberosEnabled();
+ String jaasFile = logFeederProps.getLogFeederSecurityConfig().getSolrJaasFile();
+ boolean securityEnabled = logFeederProps.getLogFeederSecurityConfig().isSolrKerberosEnabled();
if (securityEnabled) {
System.setProperty("java.security.auth.login.config", jaasFile);
HttpClientUtil.addConfigurer(new Krb5HttpClientConfigurer());
@@ -325,7 +328,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
}
@Override
- public long getPendingCount() {
+ public Long getPendingCount() {
long pendingCount = 0;
for (SolrWorkerThread solrWorkerThread : workerThreadList) {
pendingCount += solrWorkerThread.localBuffer.size();
@@ -476,7 +479,8 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
Level.ERROR);
}
}
- latestInputMarkers.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+ latestInputMarkers.put(outputData.inputMarker.getAllProperties().get("base64_file_key").toString(),
+ outputData.inputMarker);
localBuffer.add(document);
}
@@ -511,7 +515,7 @@ public class OutputSolr extends Output implements CollectionStateWatcher {
statMetric.value += localBuffer.size();
writeBytesMetric.value += localBufferBytesSize;
for (InputMarker inputMarker : latestInputMarkers.values()) {
- inputMarker.input.checkIn(inputMarker);
+ inputMarker.getInput().checkIn(inputMarker);
}
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index e5974c5..a2d7692 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -18,11 +18,11 @@
package org.apache.ambari.logfeeder.output;
+import org.apache.ambari.logfeeder.plugin.common.ConfigItem;
+
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.common.ConfigBlock;
-
/**
* Holds all configuration relevant for S3 upload.
*/
@@ -80,7 +80,7 @@ public class S3OutputConfiguration {
return ((Map<String, String>) configs.get(ADDITIONAL_FIELDS_KEY)).get(CLUSTER_KEY);
}
- public static S3OutputConfiguration fromConfigBlock(ConfigBlock configBlock) {
+ public static S3OutputConfiguration fromConfigBlock(ConfigItem configItem) {
Map<String, Object> configs = new HashMap<>();
String[] stringValuedKeysToCopy = new String[] {
SPOOL_DIR_KEY, S3_BUCKET_NAME_KEY, S3_LOG_DIR_KEY,
@@ -88,7 +88,7 @@ public class S3OutputConfiguration {
};
for (String key : stringValuedKeysToCopy) {
- String value = configBlock.getStringValue(key);
+ String value = configItem.getStringValue(key);
if (value != null) {
configs.put(key, value);
}
@@ -103,10 +103,10 @@ public class S3OutputConfiguration {
};
for (int i = 0; i < longValuedKeysToCopy.length; i++) {
- configs.put(longValuedKeysToCopy[i], configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+ configs.put(longValuedKeysToCopy[i], configItem.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
}
- configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));
+ configs.put(ADDITIONAL_FIELDS_KEY, configItem.getNVList(ADDITIONAL_FIELDS_KEY));
return new S3OutputConfiguration(configs);
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index e95a663..ddf3995 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -22,7 +22,6 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
import com.google.common.annotations.VisibleForTesting;
-
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.CompressionUtil;
import org.apache.ambari.logfeeder.util.S3Util;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index 1f13357..7fc47a9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -19,11 +19,14 @@
package org.apache.ambari.logfeeder.output.spool;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.output.Output;
import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.log4j.Logger;
-import java.io.*;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
@@ -32,7 +35,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/**
* A class that manages local storage of log events before they are uploaded to the output destinations.
*
- * This class should be used by any {@link Output}s that wish to upload log files to an
+ * This class should be used by any {@link org.apache.ambari.logfeeder.plugin.output.Output}s that wish to upload log files to an
* output destination on a periodic batched basis. Log events should be added to an instance
* of this class to be stored locally. This class determines when to
* rollover using calls to an interface {@link RolloverCondition}. Likewise, it uses an interface
@@ -57,7 +60,7 @@ public class LogSpooler {
/**
* Create an instance of the LogSpooler.
* @param spoolDirectory The directory under which spooler files are created.
- * Should be unique per instance of {@link Output}
+ * Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
* @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
* @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
* determine when to rollover.
@@ -73,7 +76,7 @@ public class LogSpooler {
/**
* Create an instance of the LogSpooler.
* @param spoolDirectory The directory under which spooler files are created.
- * Should be unique per instance of {@link Output}
+ * Should be unique per instance of {@link org.apache.ambari.logfeeder.plugin.output.Output}
* @param sourceFileNamePrefix The prefix with which the locally spooled files are created.
* @param rolloverCondition An object of type {@link RolloverCondition} that will be used to
* determine when to rollover.
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 9efab25..6fd00a4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -16,9 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-
package org.apache.ambari.logfeeder.util;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
import java.lang.reflect.Type;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -26,31 +33,19 @@ import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
-import org.apache.ambari.logfeeder.metrics.MetricData;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.reflect.TypeToken;
-
-/**
- * This class contains utility methods used by LogFeeder
- */
public class LogFeederUtil {
private static final Logger LOG = Logger.getLogger(LogFeederUtil.class);
private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
-
+
public static Gson getGson() {
return gson;
}
public static String hostName = null;
public static String ipAddress = null;
-
+
static{
try {
InetAddress ip = InetAddress.getLocalHost();
@@ -65,7 +60,7 @@ public class LogFeederUtil {
hostName = getHostName;
}
LOG.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName +
- ", hostName=" + hostName);
+ ", hostName=" + hostName);
} catch (UnknownHostException e) {
LOG.error("Error getting hostname.", e);
}
@@ -76,7 +71,7 @@ public class LogFeederUtil {
long currMS = System.currentTimeMillis();
if (currStat > metric.prevLogValue) {
LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
- " secs, count=" + (currStat - metric.prevLogValue) + postFix);
+ " secs, count=" + (currStat - metric.prevLogValue) + postFix);
}
metric.prevLogValue = currStat;
metric.prevLogTime = currMS;
@@ -119,15 +114,15 @@ public class LogFeederUtil {
private int counter = 0;
}
- private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+ private static Map<String, LogFeederUtil.LogHistory> logHistoryList = new Hashtable<>();
public static boolean logErrorMessageByInterval(String key, String message, Throwable e, Logger callerLogger, Level level) {
- LogHistory log = logHistoryList.get(key);
+ LogFeederUtil.LogHistory log = logHistoryList.get(key);
if (log == null) {
- log = new LogHistory();
+ log = new LogFeederUtil.LogHistory();
logHistoryList.put(key, log);
}
-
+
if ((System.currentTimeMillis() - log.lastLogTime) > 30 * 1000) {
log.lastLogTime = System.currentTimeMillis();
if (log.counter > 0) {
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index e3a822a..49fb301 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -21,9 +21,9 @@ package org.apache.ambari.logfeeder.filter;
import java.util.Map;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterGrokDescriptor;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterGrokDescriptorImpl;
import org.apache.log4j.Logger;
@@ -65,12 +65,12 @@ public class FilterGrokTest {
filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
init(filterGrokDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
- filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
- filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -91,7 +91,7 @@ public class FilterGrokTest {
filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
init(filterGrokDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
@@ -99,7 +99,7 @@ public class FilterGrokTest {
+ "as one may expect";
String[] messageLines = multiLineMessage.split("\r\n");
for (int i = 0; i < messageLines.length; i++)
- filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker(null, null, 0));
+ filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputFileMarker(null, null, 0));
filterGrok.flush();
EasyMock.verify(mockOutputManager);
@@ -121,12 +121,12 @@ public class FilterGrokTest {
filterGrokDescriptor.setMultilinePattern("^(%{TIMESTAMP_ISO8601:logtime})");
init(filterGrokDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockOutputManager);
- filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
- filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+ filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+ filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
assertFalse("Something was captured!", capture.hasCaptured());
@@ -142,8 +142,8 @@ public class FilterGrokTest {
EasyMock.replay(mockOutputManager);
- filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
- filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputFileMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
assertFalse("Something was captured", capture.hasCaptured());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index ef10c46..36139ea 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -27,8 +27,8 @@ import java.util.TimeZone;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.common.LogFeederException;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterJsonDescriptorImpl;
import org.apache.log4j.Logger;
import org.easymock.Capture;
@@ -64,7 +64,7 @@ public class FilterJSONTest {
init(new FilterJsonDescriptorImpl());
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
@@ -72,7 +72,7 @@ public class FilterJSONTest {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String dateString = sdf.format(d);
- filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker(null, null, 0));
+ filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -89,7 +89,7 @@ public class FilterJSONTest {
init(new FilterJsonDescriptorImpl());
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
@@ -97,7 +97,7 @@ public class FilterJSONTest {
DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String dateString = sdf.format(d);
- filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker(null, null, 0));
+ filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -114,11 +114,11 @@ public class FilterJSONTest {
init(new FilterJsonDescriptorImpl());
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
- filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker(null, null, 0));
+ filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -137,7 +137,7 @@ public class FilterJSONTest {
String inputStr = "invalid json";
try{
- filterJson.apply(inputStr,new InputMarker(null, null, 0));
+ filterJson.apply(inputStr,new InputFileMarker(null, null, 0));
fail("Expected LogFeederException was not occured");
} catch(LogFeederException logFeederException) {
assertEquals("Json parsing failed for inputstr = " + inputStr, logFeederException.getLocalizedMessage());
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 4a85b88..ff8cb6e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -21,10 +21,10 @@ package org.apache.ambari.logfeeder.filter;
import java.util.Map;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.output.OutputManager;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
+import org.apache.ambari.logfeeder.plugin.manager.OutputManager;
import org.apache.ambari.logsearch.config.api.model.inputconfig.FilterKeyValueDescriptor;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.FilterKeyValueDescriptorImpl;
-import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
@@ -62,11 +62,11 @@ public class FilterKeyValueTest {
filterKeyValueDescriptor.setFieldSplit("&");
init(filterKeyValueDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -87,11 +87,11 @@ public class FilterKeyValueTest {
filterKeyValueDescriptor.setValueBorders("()");
init(filterKeyValueDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall();
EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ keyValueField: 'name1(value1)&name2(value2)' }", new InputMarker(null, null, 0));
+ filterKeyValue.apply("{ keyValueField: 'name1(value1)&name2(value2)' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
@@ -110,11 +110,11 @@ public class FilterKeyValueTest {
filterKeyValueDescriptor.setFieldSplit("&");
init(filterKeyValueDescriptor);
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
assertFalse("Something was captured!", capture.hasCaptured());
@@ -130,11 +130,11 @@ public class FilterKeyValueTest {
init(filterKeyValueDescriptor);
// using default value split: =
- mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputFileMarker.class));
EasyMock.expectLastCall().anyTimes();
EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
+ filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputFileMarker(null, null, 0));
EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 01b4e54..e3349fc 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -26,8 +26,9 @@ import java.util.List;
import org.apache.ambari.logfeeder.conf.LogEntryCacheConfig;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.filter.Filter;
+import org.apache.ambari.logfeeder.plugin.input.InputMarker;
+import org.apache.ambari.logfeeder.plugin.manager.InputManager;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputFileDescriptorImpl;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
@@ -61,7 +62,7 @@ public class InputFileTest {
private InputFile inputFile;
private List<String> rows = new ArrayList<>();
- private InputMarker testInputMarker;
+ private InputFileMarker testInputMarker;
private LogFeederProps logFeederProps;
@@ -80,7 +81,11 @@ public class InputFileTest {
public void setUp() throws Exception {
logFeederProps = new LogFeederProps();
LogEntryCacheConfig logEntryCacheConfig = new LogEntryCacheConfig();
+ logEntryCacheConfig.setCacheEnabled(false);
+ logEntryCacheConfig.setCacheLastDedupEnabled(false);
+ logEntryCacheConfig.setCacheSize(10);
logFeederProps.setLogEntryCacheConfig(logEntryCacheConfig);
+ testInputMarker = new InputFileMarker(inputFile, "", 0);
}
public void init(String path) throws Exception {
@@ -92,18 +97,21 @@ public class InputFileTest {
inputFileDescriptor.setRowtype("service");
inputFileDescriptor.setPath(path);
- Filter capture = new Filter() {
+ Filter capture = new Filter<LogFeederProps>() {
@Override
public void init(LogFeederProps logFeederProps) {
}
@Override
+ public String getShortDescription() {
+ return null;
+ }
+
+ @Override
public void apply(String inputStr, InputMarker inputMarker) {
rows.add(inputStr);
if (rows.size() % 3 == 0)
inputFile.setDrain(true);
-
- testInputMarker = inputMarker;
}
};
@@ -138,32 +146,6 @@ public class InputFileTest {
}
@Test
- public void testInputFile_process6RowsInterrupted() throws Exception {
- LOG.info("testInputFile_process6RowsInterrupted()");
-
- File checkPointDir = createCheckpointDir("process6_checkpoint");
- File testFile = createFile("process6.log");
- init(testFile.getAbsolutePath());
-
- InputManager inputMabager = EasyMock.createStrictMock(InputManager.class);
- EasyMock.expect(inputMabager.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
- EasyMock.replay(inputMabager);
- inputFile.setInputManager(inputMabager);
-
- inputFile.isReady();
- inputFile.start();
- inputFile.checkIn(testInputMarker);
- inputFile.setDrain(false);
- inputFile.start();
-
- assertEquals("Amount of the rows is incorrect", rows.size(), 6);
- for (int row = 0; row < 6; row++)
- assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
-
- EasyMock.verify(inputMabager);
- }
-
- @Test
public void testInputFile_noLogPath() throws Exception {
LOG.info("testInputFile_noLogPath()");
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
index 9dba349..574fa35 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputManagerTest.java
@@ -25,7 +25,8 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.junit.Test;
public class InputManagerTest {
@@ -42,7 +43,7 @@ public class InputManagerTest {
replay(input1, input2, input3, input4);
- InputManager manager = new InputManager();
+ InputManagerImpl manager = new InputManagerImpl();
manager.add("serviceName", input1);
manager.add("serviceName", input2);
manager.add("serviceName", input3);
@@ -79,8 +80,8 @@ public class InputManagerTest {
expect(input3.getShortDescription()).andReturn("").once();
replay(input1, input2, input3);
-
- InputManager manager = new InputManager();
+
+ InputManagerImpl manager = new InputManagerImpl();
manager.setLogFeederProps(logFeederProps);
manager.add("serviceName", input1);
manager.add("serviceName", input2);
@@ -109,8 +110,8 @@ public class InputManagerTest {
expect(input3.isReady()).andReturn(false);
replay(input1, input2, input3);
-
- InputManager manager = new InputManager();
+
+ InputManagerImpl manager = new InputManagerImpl();
manager.add("serviceName", input1);
manager.add("serviceName", input2);
manager.add("serviceName", input3);
@@ -135,8 +136,8 @@ public class InputManagerTest {
expect(input3.isReady()).andReturn(false);
replay(input1, input2, input3);
-
- InputManager manager = new InputManager();
+
+ InputManagerImpl manager = new InputManagerImpl();
manager.add("serviceName", input1);
manager.add("serviceName", input2);
manager.add("serviceName", input3);
@@ -157,8 +158,8 @@ public class InputManagerTest {
input3.lastCheckIn(); expectLastCall();
replay(input1, input2, input3);
-
- InputManager manager = new InputManager();
+
+ InputManagerImpl manager = new InputManagerImpl();
manager.add("serviceName", input1);
manager.add("serviceName", input2);
manager.add("serviceName", input3);
@@ -183,8 +184,8 @@ public class InputManagerTest {
expect(input3.isClosed()).andReturn(true);
replay(input1, input2, input3);
-
- InputManager manager = new InputManager();
+
+ InputManagerImpl manager = new InputManagerImpl();
manager.add("serviceName", input1);
manager.add("serviceName", input2);
manager.add("serviceName", input3);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
index dd97d27..4ff818a 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/cache/LRUCacheTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.ambari.logfeeder.input.cache;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
import org.joda.time.DateTime;
import org.junit.Before;
import org.junit.Test;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
index da8fff7..9699156 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/metrics/MetricsManagerTest.java
@@ -20,6 +20,8 @@ package org.apache.ambari.logfeeder.metrics;
import static org.easymock.EasyMock.*;
import static org.junit.Assert.*;
+
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
index c0babc4..1623738 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
@@ -24,9 +24,9 @@ import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.output.OutputKafka.KafkaCallBack;
+import org.apache.ambari.logfeeder.plugin.input.Input;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -86,7 +86,7 @@ public class OutputKafkaTest {
EasyMock.replay(mockKafkaProducer);
for (int i = 0; i < 10; i++) {
- InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
+ InputFileMarker inputMarker = new InputFileMarker(EasyMock.mock(Input.class), null, 0);
outputKafka.write("value" + i, inputMarker);
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
index 6e108ab..e97ccaf 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputLineFilterTest.java
@@ -19,8 +19,8 @@
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.common.LogFeederConstants;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.cache.LRUCache;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.input.cache.LRUCache;
import org.apache.ambari.logsearch.config.api.model.inputconfig.InputDescriptor;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
import org.easymock.EasyMock;
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
index 49f5a11..f1c27ab 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputManagerTest.java
@@ -30,10 +30,11 @@ import java.util.List;
import java.util.Map;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.input.InputFileMarker;
import org.apache.ambari.logfeeder.loglevelfilter.LogLevelFilterHandler;
-import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.plugin.common.MetricData;
+import org.apache.ambari.logfeeder.plugin.input.Input;
+import org.apache.ambari.logfeeder.plugin.output.Output;
import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
import org.junit.Test;
@@ -47,7 +48,7 @@ public class OutputManagerTest {
replay(output1, output2, output3);
- OutputManager manager = new OutputManager();
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
@@ -72,8 +73,8 @@ public class OutputManagerTest {
output3.init(logFeederProps); expectLastCall();
replay(output1, output2, output3);
-
- OutputManager manager = new OutputManager();
+
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
@@ -95,7 +96,7 @@ public class OutputManagerTest {
jsonObj.put("id", "testId");
Input mockInput = strictMock(Input.class);
- InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+ InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
inputDescriptor.setAddFields(Collections.<String, String> emptyMap());
@@ -119,7 +120,7 @@ public class OutputManagerTest {
replay(output1, output2, output3, mockFilter, mockInput);
- OutputManager manager = new OutputManager();
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.setLogLevelFilterHandler(mockFilter);
manager.add(output1);
manager.add(output2);
@@ -135,7 +136,7 @@ public class OutputManagerTest {
String jsonString = "{}";
Input mockInput = strictMock(Input.class);
- InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+ InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
Output output1 = strictMock(Output.class);
@@ -153,8 +154,8 @@ public class OutputManagerTest {
output3.write(jsonString, inputMarker); expectLastCall();
replay(output1, output2, output3, mockInput, mockFilter);
-
- OutputManager manager = new OutputManager();
+
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.setLogLevelFilterHandler(mockFilter);
manager.add(output1);
manager.add(output2);
@@ -179,7 +180,7 @@ public class OutputManagerTest {
replay(output1, output2, output3);
- OutputManager manager = new OutputManager();
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
@@ -201,7 +202,7 @@ public class OutputManagerTest {
replay(output1, output2, output3);
- OutputManager manager = new OutputManager();
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
@@ -216,7 +217,7 @@ public class OutputManagerTest {
File f = new File("");
Input mockInput = strictMock(Input.class);
- InputMarker inputMarker = new InputMarker(mockInput, null, 0);
+ InputFileMarker inputMarker = new InputFileMarker(mockInput, null, 0);
Output output1 = strictMock(Output.class);
Output output2 = strictMock(Output.class);
@@ -230,7 +231,7 @@ public class OutputManagerTest {
replay(output1, output2, output3, mockInput);
- OutputManager manager = new OutputManager();
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
@@ -259,8 +260,8 @@ public class OutputManagerTest {
expect(output3.isClosed()).andReturn(true);
replay(output1, output2, output3);
-
- OutputManager manager = new OutputManager();
+
+ OutputManagerImpl manager = new OutputManagerImpl();
manager.add(output1);
manager.add(output2);
manager.add(output3);
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
index 78cf014..6674be1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputS3FileTest.java
@@ -19,11 +19,7 @@
package org.apache.ambari.logfeeder.output;
import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
-import org.apache.ambari.logsearch.config.zookeeper.model.inputconfig.impl.InputDescriptorImpl;
import org.junit.Before;
import org.junit.Test;
@@ -68,76 +64,6 @@ public class OutputS3FileTest {
}
@Test
- public void shouldSpoolLogEventToNewSpooler() throws Exception {
-
- Input input = mock(Input.class);
- InputMarker inputMarker = new InputMarker(input, null, 0);
- InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
- inputDescriptor.setType("hdfs-namenode");
-
- expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
- expect(input.getInputDescriptor()).andReturn(inputDescriptor);
- final LogSpooler spooler = mock(LogSpooler.class);
- spooler.add("log event block");
- final S3Uploader s3Uploader = mock(S3Uploader.class);
- replay(input, spooler, s3Uploader);
-
- OutputS3File outputS3File = new OutputS3File() {
- @Override
- protected LogSpooler createSpooler(String filePath) {
- return spooler;
- }
-
- @Override
- protected S3Uploader createUploader(String logType) {
- return s3Uploader;
- }
- };
- outputS3File.loadConfig(configMap);
- outputS3File.init(new LogFeederProps());
- outputS3File.write("log event block", inputMarker);
- verify(spooler);
- }
-
- @Test
- public void shouldReuseSpoolerForSamePath() throws Exception {
- Input input = mock(Input.class);
- InputMarker inputMarker = new InputMarker(input, null, 0);
- InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
- inputDescriptor.setType("hdfs-namenode");
-
- expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
- expect(input.getInputDescriptor()).andReturn(inputDescriptor);
- final LogSpooler spooler = mock(LogSpooler.class);
- spooler.add("log event block1");
- spooler.add("log event block2");
- final S3Uploader s3Uploader = mock(S3Uploader.class);
- replay(input, spooler, s3Uploader);
-
- OutputS3File outputS3File = new OutputS3File() {
- private boolean firstCallComplete;
- @Override
- protected LogSpooler createSpooler(String filePath) {
- if (!firstCallComplete) {
- firstCallComplete = true;
- return spooler;
- }
- throw new IllegalStateException("Shouldn't call createSpooler for same path.");
- }
-
- @Override
- protected S3Uploader createUploader(String logType) {
- return s3Uploader;
- }
- };
- outputS3File.loadConfig(configMap);
- outputS3File.init(new LogFeederProps());
- outputS3File.write("log event block1", inputMarker);
- outputS3File.write("log event block2", inputMarker);
- verify(spooler);
- }
-
- @Test
public void shouldRolloverWhenSufficientSizeIsReached() throws Exception {
String thresholdSize = Long.toString(15 * 1024 * 1024L);
@@ -171,35 +97,4 @@ public class OutputS3FileTest {
assertFalse(outputS3File.shouldRollover(logSpoolerContext));
}
-
- @Test
- public void shouldUploadFileOnRollover() throws Exception {
- Input input = mock(Input.class);
- InputMarker inputMarker = new InputMarker(input, null, 0);
- InputDescriptorImpl inputDescriptor = new InputDescriptorImpl() {};
- inputDescriptor.setType("hdfs-namenode");
-
- expect(input.getFilePath()).andReturn("/var/log/hdfs-namenode.log");
- expect(input.getInputDescriptor()).andReturn(inputDescriptor);
- final LogSpooler spooler = mock(LogSpooler.class);
- spooler.add("log event block1");
- final S3Uploader s3Uploader = mock(S3Uploader.class);
- s3Uploader.addFileForUpload("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz");
- replay(input, spooler, s3Uploader);
-
- OutputS3File outputS3File = new OutputS3File() {
- @Override
- protected LogSpooler createSpooler(String filePath) {
- return spooler;
- }
- @Override
- protected S3Uploader createUploader(String logType) {
- return s3Uploader;
- }
- };
- outputS3File.write("log event block1", inputMarker);
- outputS3File.handleRollover(new File("/var/ambari-logsearch/logfeeder/hdfs-namenode.log.gz"));
-
- verify(s3Uploader);
- }
}
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
deleted file mode 100644
index 70d5c8f..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import java.net.MalformedURLException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.ambari.logfeeder.conf.LogFeederProps;
-import org.apache.ambari.logfeeder.conf.LogFeederSecurityConfig;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logsearch.config.api.LogSearchConfigLogFeeder;
-import org.apache.ambari.logsearch.config.api.model.outputconfig.OutputSolrProperties;
-import org.apache.ambari.logsearch.config.zookeeper.model.outputconfig.impl.OutputSolrPropertiesImpl;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.response.UpdateResponse;
-import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.util.NamedList;
-import org.easymock.EasyMock;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-public class OutputSolrTest {
- private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
-
- private OutputSolr outputSolr;
- private LogSearchConfigLogFeeder logSearchConfigMock;
- private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
- private LogFeederProps logFeederProps = new LogFeederProps();
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Before
- public void init() throws Exception {
- LogFeederSecurityConfig logFeederSecurityConfig = new LogFeederSecurityConfig();
- logFeederSecurityConfig.setSolrKerberosEnabled(false);
- logFeederProps.setLogFeederSecurityConfig(logFeederSecurityConfig);
- outputSolr = new OutputSolr() {
- @SuppressWarnings("deprecation")
- @Override
- CloudSolrClient getSolrClient(int count) throws Exception, MalformedURLException {
- return new CloudSolrClient(null) {
- private static final long serialVersionUID = 1L;
-
- @Override
- public UpdateResponse add(Collection<SolrInputDocument> docs) {
- for (SolrInputDocument doc : docs) {
- receivedDocs.put((Integer) doc.getField("id").getValue(), doc);
- }
-
- UpdateResponse response = new UpdateResponse();
- response.setResponse(new NamedList<Object>());
- return response;
- }
- };
- }
- };
-
- OutputSolrProperties outputSolrProperties = new OutputSolrPropertiesImpl("hadoop_logs", "none");
- logSearchConfigMock = EasyMock.createNiceMock(LogSearchConfigLogFeeder.class);
- EasyMock.expect(logSearchConfigMock.getOutputSolrProperties("service")).andReturn(outputSolrProperties);
- EasyMock.replay(logSearchConfigMock);
-
- outputSolr.setLogSearchConfig(logSearchConfigMock);
- }
-
- @Test
- public void testOutputToSolr_uploadData() throws Exception {
- LOG.info("testOutputToSolr_uploadData()");
-
- Map<String, Object> config = new HashMap<String, Object>();
- config.put("zk_connect_string", "some zk_connect_string");
- config.put("workers", "3");
- config.put("type", "service");
-
- outputSolr.loadConfig(config);
- outputSolr.init(logFeederProps);
-
- Map<Integer, SolrInputDocument> expectedDocs = new HashMap<>();
-
- int count = 0;
- for (int i = 0; i < 10; i++) {
- Map<String, Object> jsonObj = new HashMap<>();
- for (int j = 0; j < 3; j++)
- jsonObj.put("name" + ++count, "value" + ++count);
- jsonObj.put("id", ++count);
-
- InputMarker inputMarker = new InputMarker(EasyMock.mock(Input.class), null, 0);
- outputSolr.write(jsonObj, inputMarker);
-
- SolrInputDocument doc = new SolrInputDocument();
- for (Map.Entry<String, Object> e : jsonObj.entrySet())
- doc.addField(e.getKey(), e.getValue());
-
- expectedDocs.put(count, doc);
- }
-
- Thread.sleep(100);
- while (outputSolr.getPendingCount() > 0)
- Thread.sleep(100);
-
- int waitToFinish = 0;
- if (receivedDocs.size() < 10 && waitToFinish < 10) {
- Thread.sleep(100);
- waitToFinish++;
- }
-
- Set<Integer> ids = new HashSet<>();
- ids.addAll(receivedDocs.keySet());
- ids.addAll(expectedDocs.keySet());
- for (Integer id : ids) {
- SolrInputDocument receivedDoc = receivedDocs.get(id);
- SolrInputDocument expectedDoc = expectedDocs.get(id);
-
- assertNotNull("No document received for id: " + id, receivedDoc);
- assertNotNull("No document expected for id: " + id, expectedDoc);
-
- Set<String> fieldNames = new HashSet<>();
- fieldNames.addAll(receivedDoc.getFieldNames());
- fieldNames.addAll(expectedDoc.getFieldNames());
-
- for (String fieldName : fieldNames) {
- Object receivedValue = receivedDoc.getFieldValue(fieldName);
- Object expectedValue = expectedDoc.getFieldValue(fieldName);
-
- assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
- assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
-
- assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue, expectedValue);
- }
- }
- }
-
- @Test
- public void testOutputToSolr_noZkConnectString() throws Exception {
- LOG.info("testOutputToSolr_noUrlOrZkConnectString()");
-
- expectedException.expect(Exception.class);
- expectedException.expectMessage("For solr output the zk_connect_string property need to be set");
-
- Map<String, Object> config = new HashMap<String, Object>();
- config.put("workers", "3");
- config.put("type", "service");
-
- outputSolr.loadConfig(config);
- outputSolr.init(logFeederProps);
- }
-
- @After
- public void cleanUp() {
- receivedDocs.clear();
- EasyMock.verify(logSearchConfigMock);
- }
-}
diff --git a/ambari-logsearch/pom.xml b/ambari-logsearch/pom.xml
index 5efb4ba..4a0bb6a 100644
--- a/ambari-logsearch/pom.xml
+++ b/ambari-logsearch/pom.xml
@@ -37,6 +37,7 @@
<module>ambari-logsearch-config-api</module>
<module>ambari-logsearch-config-zookeeper</module>
<module>ambari-logsearch-it</module>
+ <module>ambari-logsearch-logfeeder-plugin-api</module>
</modules>
<properties>
<jdk.version>1.8</jdk.version>
--
To stop receiving notification emails like this one, please contact
['"commits@ambari.apache.org" <co...@ambari.apache.org>'].