You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/11 18:15:02 UTC
[48/51] [partial] ambari git commit: AMBARI-15679. Initial commit for
LogSearch module (oleewre)
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
new file mode 100644
index 0000000..9b2a717
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.io.BufferedInputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import oi.thekraken.grok.api.Grok;
+import oi.thekraken.grok.api.exception.GrokException;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.gson.reflect.TypeToken;
+
+public class FilterGrok extends Filter {
+ static private Logger logger = Logger.getLogger(FilterGrok.class);
+
+ private static final String GROK_PATTERN_FILE = "grok-patterns";
+
+ String messagePattern = null;
+ String multilinePattern = null;
+
+ Grok grokMultiline = null;
+ Grok grokMessage = null;
+
+ StringBuilder strBuff = null;
+ String currMultilineJsonStr = null;
+
+ InputMarker firstInputMarker = null;
+ InputMarker savedInputMarker = null;
+
+ String sourceField = null;
+ boolean removeSourceField = true;
+
+ Set<String> namedParamList = new HashSet<String>();
+ Set<String> multiLineamedParamList = new HashSet<String>();
+
+ Type jsonType = new TypeToken<Map<String, String>>() {
+ }.getType();
+
+ public MetricCount grokErrorMetric = new MetricCount();
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+
+ try {
+ grokErrorMetric.metricsName = "filter.error.grok";
+ // Get the Grok file patterns
+ messagePattern = escapePattern(getStringValue("message_pattern"));
+ multilinePattern = escapePattern(getStringValue("multiline_pattern"));
+ sourceField = getStringValue("source_field");
+ removeSourceField = getBooleanValue("remove_source_field",
+ removeSourceField);
+
+ logger.info("init() done. grokPattern=" + messagePattern
+ + ", multilinePattern=" + multilinePattern + ", "
+ + getShortDescription());
+ if (StringUtils.isEmpty(messagePattern)) {
+ logger.error("message_pattern is not set for filter.");
+ return;
+ }
+ extractNamedParams(messagePattern, namedParamList);
+
+ grokMessage = new Grok();
+ // grokMessage.addPatternFromReader(r);
+ loadPatterns(grokMessage);
+ grokMessage.compile(messagePattern);
+ if (!StringUtils.isEmpty(multilinePattern)) {
+ extractNamedParams(multilinePattern, multiLineamedParamList);
+
+ grokMultiline = new Grok();
+ loadPatterns(grokMultiline);
+ grokMultiline.compile(multilinePattern);
+ }
+ } catch (Throwable t) {
+ logger.fatal(
+ "Caught exception while initializing Grok. multilinePattern="
+ + multilinePattern + ", messagePattern="
+ + messagePattern, t);
+ grokMessage = null;
+ grokMultiline = null;
+ }
+
+ }
+
+ /**
+ * @param stringValue
+ * @return
+ */
+ private String escapePattern(String inPattern) {
+ String inStr = inPattern;
+ if (inStr != null) {
+ if (inStr.contains("(?m)") && !inStr.contains("(?s)")) {
+ inStr = inStr.replaceFirst("(?m)", "(?s)");
+ }
+ // inStr = inStr.replaceAll("\\[", "\\\\[");
+ // inStr = inStr.replaceAll("\\]", "\\\\]");
+ // inStr = inStr.replaceAll("\\(", "\\\\(");
+ // inStr = inStr.replaceAll("\\)", "\\\\)");
+ }
+ return inStr;
+ }
+
+ private void extractNamedParams(String patternStr, Set<String> paramList) {
+ String grokRegEx = "%\\{" + "(?<name>" + "(?<pattern>[A-z0-9]+)"
+ + "(?::(?<subname>[A-z0-9_:]+))?" + ")" + "(?:=(?<definition>"
+ + "(?:" + "(?:[^{}]+|\\.+)+" + ")+" + ")" + ")?" + "\\}";
+
+ Pattern pattern = Pattern.compile(grokRegEx);
+ java.util.regex.Matcher matcher = pattern.matcher(patternStr);
+ while (matcher.find()) {
+ String subname = matcher.group(3);
+ if (subname != null) {
+ paramList.add(subname);
+ }
+ }
+ }
+
+ private boolean loadPatterns(Grok grok) {
+ InputStreamReader grokPatternsReader = null;
+ logger.info("Loading pattern file " + GROK_PATTERN_FILE);
+ try {
+ BufferedInputStream fileInputStream = (BufferedInputStream) this
+ .getClass().getClassLoader()
+ .getResourceAsStream(GROK_PATTERN_FILE);
+ if (fileInputStream == null) {
+ logger.fatal("Couldn't load grok-patterns file "
+ + GROK_PATTERN_FILE + ". Things will not work");
+ return false;
+ }
+ grokPatternsReader = new InputStreamReader(fileInputStream);
+ } catch (Throwable t) {
+ logger.fatal("Error reading grok-patterns file " + GROK_PATTERN_FILE
+ + " from classpath. Grok filtering will not work.", t);
+ return false;
+ }
+ try {
+ grok.addPatternFromReader(grokPatternsReader);
+ } catch (GrokException e) {
+ logger.fatal(
+ "Error loading patterns from grok-patterns reader for file "
+ + GROK_PATTERN_FILE, e);
+ return false;
+ }
+
+ return true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
+ */
+ @Override
+ public void apply(String inputStr, InputMarker inputMarker) {
+ if (grokMessage == null) {
+ return;
+ }
+
+ if (grokMultiline != null) {
+ // Check if new line
+ String jsonStr = grokMultiline.capture(inputStr);
+ if (!"{}".equals(jsonStr)) {
+ // New line
+ if (strBuff != null) {
+ savedInputMarker.beginLineNumber = firstInputMarker.lineNumber;
+ // Construct JSON object and add only the interested named
+ // parameters
+ Map<String, Object> jsonObj = Collections
+ .synchronizedMap(new HashMap<String, Object>());
+ try {
+ // Handle message parsing
+ applyMessage(strBuff.toString(), jsonObj,
+ currMultilineJsonStr);
+ } finally {
+ strBuff = null;
+ savedInputMarker = null;
+ firstInputMarker = null;
+ }
+ }
+ currMultilineJsonStr = jsonStr;
+ }
+
+ if (strBuff == null) {
+ strBuff = new StringBuilder();
+ firstInputMarker = inputMarker;
+ } else {
+ // strBuff.append(System.lineSeparator());
+ strBuff.append('\r');
+ strBuff.append('\n');
+ }
+ strBuff.append(inputStr);
+ savedInputMarker = inputMarker;
+ } else {
+ savedInputMarker = inputMarker;
+ Map<String, Object> jsonObj = Collections
+ .synchronizedMap(new HashMap<String, Object>());
+ applyMessage(inputStr, jsonObj, null);
+ }
+ }
+
+ @Override
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ if (sourceField != null) {
+ savedInputMarker = inputMarker;
+ applyMessage((String) jsonObj.get(sourceField), jsonObj, null);
+ if (removeSourceField) {
+ jsonObj.remove(sourceField);
+ }
+ }
+ }
+
+ /**
+ * @param inputStr
+ * @param jsonObj
+ */
+ private void applyMessage(String inputStr, Map<String, Object> jsonObj,
+ String multilineJsonStr) {
+ String jsonStr = grokParse(inputStr);
+
+ boolean parseError = false;
+ if ("{}".equals(jsonStr)) {
+ parseError = true;
+ // Error parsing string.
+ logParseError(inputStr);
+
+ if (multilineJsonStr == null) {
+ // TODO: Should we just add this as raw message in solr?
+ return;
+ }
+ }
+
+ if (parseError) {
+ jsonStr = multilineJsonStr;
+ }
+ Map<String, String> jsonSrc = LogFeederUtil.getGson().fromJson(jsonStr,
+ jsonType);
+ for (String namedParam : namedParamList) {
+ if (jsonSrc.get(namedParam) != null) {
+ jsonObj.put(namedParam, jsonSrc.get(namedParam));
+ }
+ }
+ if (parseError) {
+ // Add error tags
+ @SuppressWarnings("unchecked")
+ List<String> tagsList = (List<String>) jsonObj.get("tags");
+ if (tagsList == null) {
+ tagsList = new ArrayList<String>();
+ jsonObj.put("tags", tagsList);
+ }
+ tagsList.add("error_grok_parsing");
+ if (sourceField == null) {
+ // For now let's put the raw message in log_message, so it is
+ // will be searchable
+ jsonObj.put("log_message", inputStr);
+ }
+ }
+
+ super.apply(jsonObj, savedInputMarker);
+ statMetric.count++;
+ }
+
+ public String grokParse(String inputStr) {
+ String jsonStr = grokMessage.capture(inputStr);
+ return jsonStr;
+ }
+
+ private void logParseError(String inputStr) {
+ grokErrorMetric.count++;
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_PARSEERROR";
+ int inputStrLength = inputStr != null ? inputStr.length() : 0;
+ LogFeederUtil.logErrorMessageByInterval(
+ LOG_MESSAGE_KEY,
+ "Error parsing string. length=" + inputStrLength
+ + ", input=" + input.getShortDescription()
+ + ". First upto 100 characters="
+ + LogFeederUtil.subString(inputStr, 100), null, logger,
+ Level.WARN);
+ }
+
+ @Override
+ public void flush() {
+ if (strBuff != null) {
+ // Handle message parsing
+ Map<String, Object> jsonObj = Collections
+ .synchronizedMap(new HashMap<String, Object>());
+ applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
+ strBuff = null;
+ savedInputMarker = null;
+ }
+ super.flush();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+ */
+ @Override
+ public String getShortDescription() {
+ return "filter:filter=grok,regex=" + messagePattern;
+ }
+
+ @Override
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ super.addMetricsContainers(metricsList);
+ metricsList.add(grokErrorMetric);
+ }
+
+ @Override
+ public void logStat() {
+ super.logStat();
+ // Printing stat for grokErrors
+ logStatForMetric(grokErrorMetric, "Stat: Grok Errors");
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
new file mode 100644
index 0000000..c4da3cb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class FilterKeyValue extends Filter {
+ static private Logger logger = Logger.getLogger(FilterKeyValue.class);
+
+ String sourceField = null;
+ String valueSplit = "=";
+ String fieldSplit = "\t";
+
+ public MetricCount errorMetric = new MetricCount();
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+ errorMetric.metricsName = "filter.error.keyvalue";
+
+ sourceField = getStringValue("source_field");
+ valueSplit = getStringValue("value_split", valueSplit);
+ fieldSplit = getStringValue("field_split", fieldSplit);
+
+ logger.info("init() done. source_field=" + sourceField
+ + ", value_split=" + valueSplit + ", " + ", field_split="
+ + fieldSplit + ", " + getShortDescription());
+ if (StringUtils.isEmpty(sourceField)) {
+ logger.fatal("source_field is not set for filter. This filter will not be applied");
+ return;
+ }
+
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
+ */
+ @Override
+ public void apply(String inputStr, InputMarker inputMarker) {
+ apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
+ }
+
+ @Override
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ if (sourceField == null) {
+ return;
+ }
+ Object valueObj = jsonObj.get(sourceField);
+ if (valueObj != null) {
+ StringTokenizer fieldTokenizer = new StringTokenizer(
+ valueObj.toString(), fieldSplit);
+ while (fieldTokenizer.hasMoreTokens()) {
+ String nv = fieldTokenizer.nextToken();
+ StringTokenizer nvTokenizer = new StringTokenizer(nv,
+ valueSplit);
+ while (nvTokenizer.hasMoreTokens()) {
+ String name = nvTokenizer.nextToken();
+ if (nvTokenizer.hasMoreTokens()) {
+ String value = nvTokenizer.nextToken();
+ jsonObj.put(name, value);
+ } else {
+ // Unbalanced name value pairs
+ logParseError("name=" + name + ", pair=" + nv
+ + ", field=" + sourceField + ", field_value="
+ + valueObj);
+ }
+ }
+ }
+ }
+ super.apply(jsonObj, inputMarker);
+ statMetric.count++;
+ }
+
+ private void logParseError(String inputStr) {
+ errorMetric.count++;
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_PARSEERROR";
+ LogFeederUtil
+ .logErrorMessageByInterval(
+ LOG_MESSAGE_KEY,
+ "Error parsing string. length=" + inputStr.length()
+ + ", input=" + input.getShortDescription()
+ + ". First upto 100 characters="
+ + LogFeederUtil.subString(inputStr, 100), null, logger,
+ Level.ERROR);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
+ */
+ @Override
+ public String getShortDescription() {
+ return "filter:filter=keyvalue,regex=" + sourceField;
+ }
+
+ @Override
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ super.addMetricsContainers(metricsList);
+ metricsList.add(errorMetric);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
new file mode 100644
index 0000000..5c4d30e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+
+
+public class JSONFilterCode extends Filter {
+ private static Logger logger = Logger.getLogger(JSONFilterCode.class);
+
+ @Override
+ public void apply(String inputStr, InputMarker inputMarker) {
+ Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(inputStr);
+ // linenumber
+ Double lineNumberD = (Double) jsonMap.get("line_number");
+ if (lineNumberD != null) {
+ long lineNumber = lineNumberD.longValue();
+ jsonMap.put("line_number", lineNumber);
+ }
+ // logtime
+ String timeStampStr = (String) jsonMap.get("logtime");
+ if (timeStampStr != null && !timeStampStr.isEmpty()) {
+ String logtime = LogFeederUtil.getDate(timeStampStr);
+ jsonMap.put("logtime", logtime);
+ }
+ super.apply(jsonMap, inputMarker);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
new file mode 100644
index 0000000..ec75f2d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.ConfigBlock;
+import org.apache.ambari.logfeeder.InputMgr;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Logger;
+
+public abstract class Input extends ConfigBlock implements Runnable {
+ static private Logger logger = Logger.getLogger(Input.class);
+
+ OutputMgr outputMgr;
+ InputMgr inputMgr;
+
+ List<Output> outputList = new ArrayList<Output>();
+
+ Filter firstFilter = null;
+ Thread thread;
+ private boolean isClosed = false;
+ String filePath = null;
+ String type = null;
+
+ boolean tail = true;
+ boolean useEventMD5 = false;
+ boolean genEventMD5 = true;
+
+ public MetricCount readBytesMetric = new MetricCount();
+
+ /**
+ * This method will be called from the thread spawned for the output. This
+ * method should only exit after all data are read from the source or the
+ * process is exiting
+ *
+ * @throws Exception
+ */
+ abstract void start() throws Exception;
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+ tail = getBooleanValue("tail", tail);
+ useEventMD5 = getBooleanValue("use_event_md5_as_id", useEventMD5);
+ genEventMD5 = getBooleanValue("gen_event_md5", genEventMD5);
+
+ if (firstFilter != null) {
+ firstFilter.init();
+ }
+ }
+
+ @Override
+ public String getNameForThread() {
+ if (filePath != null) {
+ try {
+ return (type + "=" + (new File(filePath)).getName());
+ } catch (Throwable ex) {
+ logger.warn("Couldn't get basename for filePath=" + filePath,
+ ex);
+ }
+ }
+ return super.getNameForThread() + ":" + type;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ try {
+ logger.info("Started to monitor. " + getShortDescription());
+ start();
+ } catch (Exception e) {
+ logger.error("Error writing to output.", e);
+ }
+ logger.info("Exiting thread. " + getShortDescription());
+ }
+
+ public void outputLine(String line, InputMarker marker) {
+ statMetric.count++;
+ readBytesMetric.count += (line.length());
+
+ if (firstFilter != null) {
+ firstFilter.apply(line, marker);
+ } else {
+ // TODO: For now, let's make filter mandatory, so that no one
+ // accidently forgets to write filter
+ // outputMgr.write(line, this);
+ }
+ }
+
+ /**
+ *
+ */
+ public void flush() {
+ if (firstFilter != null) {
+ firstFilter.flush();
+ }
+ }
+
+ public boolean monitor() {
+ if (isReady()) {
+ logger.info("Starting thread. " + getShortDescription());
+ thread = new Thread(this, getNameForThread());
+ thread.start();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ public void checkIn(InputMarker inputMarker) {
+ // Default implementation is to ignore.
+ }
+
+ /**
+ * This is generally used by final checkin
+ */
+ public void checkIn() {
+
+ }
+
+ /**
+ * @return
+ */
+ public boolean isReady() {
+ return true;
+ }
+
+ public boolean isTail() {
+ return tail;
+ }
+
+ public void setTail(boolean tail) {
+ this.tail = tail;
+ }
+
+ public boolean isUseEventMD5() {
+ return useEventMD5;
+ }
+
+ public void setUseEventMD5(boolean useEventMD5) {
+ this.useEventMD5 = useEventMD5;
+ }
+
+ public boolean isGenEventMD5() {
+ return genEventMD5;
+ }
+
+ public void setGenEventMD5(boolean genEventMD5) {
+ this.genEventMD5 = genEventMD5;
+ }
+
+ @Override
+ public void setDrain(boolean drain) {
+ logger.info("Request to drain. " + getShortDescription());
+ super.setDrain(drain);
+ ;
+ try {
+ thread.interrupt();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+
+ public Filter getFirstFilter() {
+ return firstFilter;
+ }
+
+ public void setFirstFilter(Filter filter) {
+ firstFilter = filter;
+ }
+
+ public void setInputMgr(InputMgr inputMgr) {
+ this.inputMgr = inputMgr;
+ }
+
+ public void setOutputMgr(OutputMgr outputMgr) {
+ this.outputMgr = outputMgr;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public void close() {
+ logger.info("Close called. " + getShortDescription());
+
+ try {
+ if (firstFilter != null) {
+ firstFilter.close();
+ } else {
+ outputMgr.close();
+ }
+ } catch (Throwable t) {
+ // Ignore
+ }
+ isClosed = true;
+ }
+
+ public void setClosed(boolean isClosed) {
+ this.isClosed = isClosed;
+ }
+
+ public boolean isClosed() {
+ return isClosed;
+ }
+
+ @Override
+ public void loadConfig(Map<String, Object> map) {
+ super.loadConfig(map);
+ String typeValue = getStringValue("type");
+ if (typeValue != null) {
+ // Explicitly add type and value to field list
+ contextFields.put("type", typeValue);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> addFields = (Map<String, Object>) map
+ .get("add_fields");
+ if (addFields == null) {
+ addFields = new HashMap<String, Object>();
+ map.put("add_fields", addFields);
+ }
+ addFields.put("type", typeValue);
+ }
+ }
+
+ @Override
+ public String getShortDescription() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void logStat() {
+ super.logStat();
+ logStatForMetric(readBytesMetric, "Stat: Bytes Read");
+
+ if (firstFilter != null) {
+ firstFilter.logStat();
+ }
+ }
+
+ @Override
+ public String toString() {
+ return getShortDescription();
+ }
+
+ /**
+ *
+ */
+ public void rollOver() {
+ // Only some inputs support it. E.g. InputFile
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public Date getEventTime() {
+ return null;
+ }
+
+ public List<Output> getOutputList() {
+ return outputList;
+ }
+
+ /**
+ * @param output
+ */
+ public void addOutput(Output output) {
+ outputList.add(output);
+ }
+
+ /**
+ * @param metricsList
+ */
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ super.addMetricsContainers(metricsList);
+ if (firstFilter != null) {
+ firstFilter.addMetricsContainers(metricsList);
+ }
+ metricsList.add(readBytesMetric);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
new file mode 100644
index 0000000..420610a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input;
+
+import java.io.BufferedReader;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputFile extends Input {
+ static private Logger logger = Logger.getLogger(InputFile.class);
+
+ // String startPosition = "beginning";
+ String logPath = null;
+ boolean isStartFromBegining = true;
+
+ boolean isReady = false;
+ File[] logPathFiles = null;
+ Object fileKey = null;
+ String base64FileKey = null;
+
+ private boolean isRolledOver = false;
+ boolean addWildCard = false;
+
+ long lastCheckPointTimeMS = 0;
+ int checkPointIntervalMS = 5 * 1000; // 5 seconds
+ RandomAccessFile checkPointWriter = null;
+ Map<String, Object> jsonCheckPoint = null;
+
+ File checkPointFile = null;
+
+ private InputMarker lastCheckPointInputMarker = null;
+
+ private String checkPointExtension = ".cp";
+
+ @Override
+ public void init() throws Exception {
+ logger.info("init() called");
+ statMetric.metricsName = "input.files.read_lines";
+ readBytesMetric.metricsName = "input.files.read_bytes";
+ checkPointExtension = LogFeederUtil.getStringProperty(
+ "logfeeder.checkpoint.extension", checkPointExtension);
+
+ // Let's close the file and set it to true after we start monitoring it
+ setClosed(true);
+ logPath = getStringValue("path");
+ tail = getBooleanValue("tail", tail);
+ addWildCard = getBooleanValue("add_wild_card", addWildCard);
+ checkPointIntervalMS = getIntValue("checkpoint.interval.ms",
+ checkPointIntervalMS);
+
+ if (logPath == null || logPath.isEmpty()) {
+ logger.error("path is empty for file input. "
+ + getShortDescription());
+ return;
+ }
+
+ String startPosition = getStringValue("start_position");
+ if (StringUtils.isEmpty(startPosition)
+ || startPosition.equalsIgnoreCase("beginning")
+ || startPosition.equalsIgnoreCase("begining")) {
+ isStartFromBegining = true;
+ }
+
+ if (!tail) {
+ // start position end doesn't apply if we are not tailing
+ isStartFromBegining = true;
+ }
+
+ setFilePath(logPath);
+ boolean isFileReady = isReady();
+
+ logger.info("File to monitor " + logPath + ", tail=" + tail
+ + ", addWildCard=" + addWildCard + ", isReady=" + isFileReady);
+
+ super.init();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#isReady()
+ */
+ @Override
+ public boolean isReady() {
+ if (!isReady) {
+ // Let's try to check whether the file is available
+ logPathFiles = getActualFiles(logPath);
+ if (logPathFiles != null && logPathFiles.length > 0
+ && logPathFiles[0].isFile()) {
+
+ if (isTail() && logPathFiles.length > 1) {
+ logger.warn("Found multiple files (" + logPathFiles.length
+ + ") for the file filter " + filePath
+ + ". Will use only the first one. Using "
+ + logPathFiles[0].getAbsolutePath());
+ }
+ logger.info("File filter " + filePath + " expanded to "
+ + logPathFiles[0].getAbsolutePath());
+ isReady = true;
+ } else {
+ logger.debug(logPath + " file doesn't exist. Ignoring for now");
+ }
+ }
+ return isReady;
+ }
+
+ private File[] getActualFiles(String searchPath) {
+ if (addWildCard) {
+ if (!searchPath.endsWith("*")) {
+ searchPath = searchPath + "*";
+ }
+ }
+ File checkFile = new File(searchPath);
+ if (checkFile.isFile()) {
+ return new File[]{checkFile};
+ }
+ // Let's do wild card search
+ // First check current folder
+ File checkFiles[] = findFileForWildCard(searchPath, new File("."));
+ if (checkFiles == null || checkFiles.length == 0) {
+ // Let's check from the parent folder
+ File parentDir = (new File(searchPath)).getParentFile();
+ if (parentDir != null) {
+ String wildCard = (new File(searchPath)).getName();
+ checkFiles = findFileForWildCard(wildCard, parentDir);
+ }
+ }
+ return checkFiles;
+ }
+
+ private File[] findFileForWildCard(String searchPath, File dir) {
+ logger.debug("findFileForWildCard(). filePath=" + searchPath + ", dir="
+ + dir + ", dir.fullpath=" + dir.getAbsolutePath());
+ FileFilter fileFilter = new WildcardFileFilter(searchPath);
+ return dir.listFiles(fileFilter);
+ }
+
+ @Override
+ synchronized public void checkIn(InputMarker inputMarker) {
+ super.checkIn(inputMarker);
+ if (checkPointWriter != null) {
+ try {
+ int lineNumber = LogFeederUtil.objectToInt(
+ jsonCheckPoint.get("line_number"), 0, "line_number");
+ if (lineNumber > inputMarker.lineNumber) {
+ // Already wrote higher line number for this input
+ return;
+ }
+ // If interval is greater than last checkPoint time, then write
+ long currMS = System.currentTimeMillis();
+ if (!isClosed()
+ && (currMS - lastCheckPointTimeMS) < checkPointIntervalMS) {
+ // Let's save this one so we can update the check point file
+ // on flush
+ lastCheckPointInputMarker = inputMarker;
+ return;
+ }
+ lastCheckPointTimeMS = currMS;
+
+ jsonCheckPoint.put("line_number", ""
+ + new Integer(inputMarker.lineNumber));
+ jsonCheckPoint.put("last_write_time_ms", "" + new Long(currMS));
+ jsonCheckPoint.put("last_write_time_date", new Date());
+
+ String jsonStr = LogFeederUtil.getGson().toJson(jsonCheckPoint);
+
+ // Let's rewind
+ checkPointWriter.seek(0);
+ checkPointWriter.writeInt(jsonStr.length());
+ checkPointWriter.write(jsonStr.getBytes());
+
+ if (isClosed()) {
+ final String LOG_MESSAGE_KEY = this.getClass()
+ .getSimpleName() + "_FINAL_CHECKIN";
+ LogFeederUtil.logErrorMessageByInterval(
+ LOG_MESSAGE_KEY,
+ "Wrote final checkPoint, input="
+ + getShortDescription()
+ + ", checkPointFile="
+ + checkPointFile.getAbsolutePath()
+ + ", checkPoint=" + jsonStr, null, logger,
+ Level.INFO);
+ }
+ } catch (Throwable t) {
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_CHECKIN_EXCEPTION";
+ LogFeederUtil
+ .logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Caught exception checkIn. , input="
+ + getShortDescription(), t, logger,
+ Level.ERROR);
+ }
+ }
+
+ }
+
+ @Override
+ public void checkIn() {
+ super.checkIn();
+ if (lastCheckPointInputMarker != null) {
+ checkIn(lastCheckPointInputMarker);
+ }
+ }
+
+ @Override
+ public void rollOver() {
+ logger.info("Marking this input file for rollover. "
+ + getShortDescription());
+ isRolledOver = true;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#monitor()
+ */
+ @Override
+ void start() throws Exception {
+ if (logPathFiles == null || logPathFiles.length == 0) {
+ return;
+ }
+
+ if (isTail()) {
+ // Just process the first file
+ processFile(logPathFiles[0]);
+ } else {
+ for (File file : logPathFiles) {
+ try {
+ processFile(file);
+ if (isClosed() || isDrain()) {
+ logger.info("isClosed or isDrain. Now breaking loop.");
+ break;
+ }
+ } catch (Throwable t) {
+ logger.error(
+ "Error processing file=" + file.getAbsolutePath(),
+ t);
+ }
+ }
+ }
+ // Call the close for the input. Which should flush to the filters and
+ // output
+ close();
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ logger.info("close() calling checkPoint checkIn(). "
+ + getShortDescription());
+ checkIn();
+ }
+
+ private void processFile(File logPathFile) throws FileNotFoundException,
+ IOException {
+ logger.info("Monitoring logPath=" + logPath + ", logPathFile="
+ + logPathFile);
+ BufferedReader br = null;
+ checkPointFile = null;
+ checkPointWriter = null;
+ jsonCheckPoint = null;
+ int resumeFromLineNumber = 0;
+
+ int lineCount = 0;
+ try {
+ setFilePath(logPathFile.getAbsolutePath());
+// br = new BufferedReader(new FileReader(logPathFile));
+ br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile));
+
+ // Whether to send to output from the beginning.
+ boolean resume = isStartFromBegining;
+
+ // Seems FileWatch is not reliable, so let's only use file key
+ // comparison
+ // inputMgr.monitorSystemFileChanges(this);
+ fileKey = getFileKey(logPathFile);
+ base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
+ .getBytes());
+ logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey
+ + ". " + getShortDescription());
+
+ if (isTail()) {
+ try {
+ // Let's see if there is a checkpoint for this file
+ logger.info("Checking existing checkpoint file. "
+ + getShortDescription());
+
+ String fileBase64 = Base64.byteArrayToBase64(fileKey
+ .toString().getBytes());
+ String checkPointFileName = fileBase64
+ + checkPointExtension;
+ File checkPointFolder = inputMgr.getCheckPointFolderFile();
+ checkPointFile = new File(checkPointFolder,
+ checkPointFileName);
+ checkPointWriter = new RandomAccessFile(checkPointFile,
+ "rw");
+
+ try {
+ int contentSize = checkPointWriter.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointWriter.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+ + contentSize
+ + ", read="
+ + readSize
+ + ", checkPointFile="
+ + checkPointFile
+ + ", input=" + getShortDescription());
+ } else {
+ // Create JSON string
+ String jsonCheckPointStr = new String(b, 0,
+ readSize);
+ jsonCheckPoint = LogFeederUtil
+ .toJSONObject(jsonCheckPointStr);
+
+ resumeFromLineNumber = LogFeederUtil.objectToInt(
+ jsonCheckPoint.get("line_number"), 0,
+ "line_number");
+
+ if (resumeFromLineNumber > 0) {
+ // Let's read from last line read
+ resume = false;
+ }
+ logger.info("CheckPoint. checkPointFile="
+ + checkPointFile + ", json="
+ + jsonCheckPointStr
+ + ", resumeFromLineNumber="
+ + resumeFromLineNumber + ", resume="
+ + resume);
+ }
+ } catch (EOFException eofEx) {
+ logger.info("EOFException. Will reset checkpoint file "
+ + checkPointFile.getAbsolutePath() + " for "
+ + getShortDescription());
+ }
+ if (jsonCheckPoint == null) {
+ // This seems to be first time, so creating the initial
+ // checkPoint object
+ jsonCheckPoint = new HashMap<String, Object>();
+ jsonCheckPoint.put("file_path", filePath);
+ jsonCheckPoint.put("file_key", fileBase64);
+ }
+
+ } catch (Throwable t) {
+ logger.error(
+ "Error while configuring checkpoint file. Will reset file. checkPointFile="
+ + checkPointFile, t);
+ }
+ }
+
+ setClosed(false);
+ int sleepStep = 2;
+ int sleepIteration = 0;
+ while (true) {
+ try {
+ if (isDrain()) {
+ break;
+ }
+
+ String line = br.readLine();
+ if (line == null) {
+ if (!resume) {
+ resume = true;
+ }
+ sleepIteration++;
+ try {
+ // Since FileWatch service is not reliable, we will
+ // check
+ // file inode every n seconds after no write
+ if (sleepIteration > 4) {
+ Object newFileKey = getFileKey(logPathFile);
+ if (newFileKey != null) {
+ if (fileKey == null
+ || !newFileKey.equals(fileKey)) {
+ logger.info("File key is different. Calling rollover. oldKey="
+ + fileKey
+ + ", newKey="
+ + newFileKey
+ + ". "
+ + getShortDescription());
+ // File has rotated.
+ rollOver();
+ }
+ }
+ }
+ // Flush on the second iteration
+ if (!tail && sleepIteration >= 2) {
+ logger.info("End of file. Done with filePath="
+ + logPathFile.getAbsolutePath()
+ + ", lineCount=" + lineCount);
+ flush();
+ break;
+ } else if (sleepIteration == 2) {
+ flush();
+ } else if (sleepIteration >= 2) {
+ if (isRolledOver) {
+ isRolledOver = false;
+ // Close existing file
+ try {
+ logger.info("File is rolled over. Closing current open file."
+ + getShortDescription()
+ + ", lineCount=" + lineCount);
+ br.close();
+ } catch (Exception ex) {
+ logger.error("Error closing file"
+ + getShortDescription());
+ break;
+ }
+ try {
+ // Open new file
+ logger.info("Opening new rolled over file."
+ + getShortDescription());
+// br = new BufferedReader(new FileReader(
+// logPathFile));
+ br = new BufferedReader(LogsearchReaderFactory.
+ INSTANCE.getReader(logPathFile));
+ lineCount = 0;
+ fileKey = getFileKey(logPathFile);
+ base64FileKey = Base64
+ .byteArrayToBase64(fileKey
+ .toString().getBytes());
+ logger.info("fileKey=" + fileKey
+ + ", base64=" + base64FileKey
+ + ", " + getShortDescription());
+ } catch (Exception ex) {
+ logger.error("Error opening rolled over file. "
+ + getShortDescription());
+ // Let's add this to monitoring and exit
+ // this
+ // thread
+ logger.info("Added input to not ready list."
+ + getShortDescription());
+ isReady = false;
+ inputMgr.addToNotReady(this);
+ break;
+ }
+ logger.info("File is successfully rolled over. "
+ + getShortDescription());
+ continue;
+ }
+ }
+ Thread.sleep(sleepStep * 1000);
+ sleepStep = (sleepStep * 2);
+ sleepStep = sleepStep > 10 ? 10 : sleepStep;
+ } catch (InterruptedException e) {
+ logger.info("Thread interrupted."
+ + getShortDescription());
+ }
+ } else {
+ lineCount++;
+ sleepStep = 1;
+ sleepIteration = 0;
+
+ if (!resume && lineCount > resumeFromLineNumber) {
+ logger.info("Resuming to read from last line. lineCount="
+ + lineCount
+ + ", input="
+ + getShortDescription());
+ resume = true;
+ }
+ if (resume) {
+ InputMarker marker = new InputMarker();
+ marker.fileKey = fileKey;
+ marker.base64FileKey = base64FileKey;
+ marker.filePath = filePath;
+ marker.input = this;
+ marker.lineNumber = lineCount;
+ outputLine(line, marker);
+ }
+ }
+ } catch (Throwable t) {
+ final String LOG_MESSAGE_KEY = this.getClass()
+ .getSimpleName() + "_READ_LOOP_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Caught exception in read loop. lineNumber="
+ + lineCount + ", input="
+ + getShortDescription(), t, logger,
+ Level.ERROR);
+
+ }
+ }
+ } finally {
+ if (br != null) {
+ logger.info("Closing reader." + getShortDescription()
+ + ", lineCount=" + lineCount);
+ try {
+ br.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * @param logPathFile2
+ * @return
+ */
+ static public Object getFileKey(File file) {
+ try {
+ Path fileFullPath = Paths.get(file.getAbsolutePath());
+ if (fileFullPath != null) {
+ BasicFileAttributes basicAttr = Files.readAttributes(
+ fileFullPath, BasicFileAttributes.class);
+ return basicAttr.fileKey();
+ }
+ } catch (Throwable ex) {
+ logger.error("Error getting file attributes for file=" + file, ex);
+ }
+ return file.toString();
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
+ */
+ @Override
+ public String getShortDescription() {
+ return "input:source="
+ + getStringValue("source")
+ + ", path="
+ + (logPathFiles != null && logPathFiles.length > 0 ? logPathFiles[0]
+ .getAbsolutePath() : getStringValue("path"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
new file mode 100644
index 0000000..6196068
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+/**
+ * This file contains the file inode, line number of the log currently been read
+ */
+public class InputMarker {
+ public int lineNumber = 0;
+ public int beginLineNumber = 0;
+ public Input input;
+ public String filePath;
+ public Object fileKey = null;
+ public String base64FileKey = null;
+
+ @Override
+ public String toString() {
+ return "InputMarker [lineNumber=" + lineNumber + ", input="
+ + input.getShortDescription() + "]";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
new file mode 100644
index 0000000..9c46c4e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.reader;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+import org.apache.log4j.Logger;
+
+public class GZIPReader extends InputStreamReader {
+
+ private static Logger logger = Logger.getLogger(GZIPReader.class);
+
+ public GZIPReader(String fileName) throws FileNotFoundException {
+ super(getStream(fileName));
+ logger.info("Created GZIPReader for file : " + fileName);
+ }
+
+ public GZIPReader(File file) throws FileNotFoundException {
+ super(getStream(file.getName()));
+ }
+
+ private static InputStream getStream(String fileName) {
+ InputStream gzipStream = null;
+ InputStream fileStream = null;
+ try {
+ fileStream = new FileInputStream(fileName);
+ gzipStream = new GZIPInputStream(fileStream);
+ } catch (Exception e) {
+ logger.error(e, e.getCause());
+ }
+ return gzipStream;
+ }
+
+ /**
+ * validating file based on magic number
+ *
+ * @param fileName
+ * @return
+ */
+ public static boolean isValidFile(String fileName) {
+ // TODO make it generic and put in factory itself
+ InputStream is = null;
+ try {
+ is = new FileInputStream(fileName);
+ byte[] signature = new byte[2];
+ int nread = is.read(signature); // read the gzip signature
+ return nread == 2 && signature[0] == (byte) 0x1f && signature[1] == (byte) 0x8b;
+ } catch (IOException e) {
+ return false;
+ } finally {
+ if (is != null) {
+ try {
+ is.close();
+ } catch (IOException e) {
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
new file mode 100644
index 0000000..a231807
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.input.reader;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.Reader;
+
+import org.apache.log4j.Logger;
+
+public enum LogsearchReaderFactory {
+ INSTANCE;
+ private static Logger logger = Logger
+ .getLogger(LogsearchReaderFactory.class);
+
+ /**
+ * @param fileName
+ * @return
+ * @throws FileNotFoundException
+ */
+ public Reader getReader(File file) throws FileNotFoundException {
+ logger.debug("Inside reader factory for file:" + file);
+ if (GZIPReader.isValidFile(file.getAbsolutePath())) {
+ logger.info("Reading file " + file + " as gzip file");
+ return new GZIPReader(file.getAbsolutePath());
+ } else {
+ return new FileReader(file);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
new file mode 100644
index 0000000..fc12458
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.SolrUtil;
+import org.apache.ambari.logfeeder.view.VLogfeederFilter;
+import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;
+import org.apache.log4j.Logger;
+
+public class FetchConfigFromSolr extends Thread {
+ private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class);
+ private static VLogfeederFilterWrapper logfeederFilterWrapper = null;
+ private static int solrConfigInterval = 5;// 5 sec;
+ private static long delay;
+ private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z
+ private static String sysTimeZone = "GMT";
+
+ public FetchConfigFromSolr() {
+ this.setName(this.getClass().getSimpleName());
+ }
+
+ @Override
+ public void run() {
+ solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.internal", solrConfigInterval);
+ delay = 1000 * solrConfigInterval;
+ do {
+ logger.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
+ pullConfigFromSolr();
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ logger.error(e.getLocalizedMessage(), e.getCause());
+ }
+ } while (true);
+ }
+
+ private synchronized void pullConfigFromSolr() {
+ HashMap<String, Object> configDocMap = SolrUtil.getInstance().getConfigDoc();
+ if (configDocMap != null) {
+ String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
+ if (configJson != null) {
+ logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class);
+ }
+ }
+ }
+
+ public static boolean isFilterExpired(VLogfeederFilter logfeederFilter) {
+ boolean isFilterExpired = false;// default is false
+ if (logfeederFilter != null) {
+ Date filterEndDate = parseFilterExpireDate(logfeederFilter);
+ if (filterEndDate != null) {
+ Date currentDate = getCurrentDate();
+ if (currentDate.compareTo(filterEndDate) >= 0) {
+ logger.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts :"
+ + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : "
+ + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate));
+ isFilterExpired = true;
+ }
+ }
+ }
+ return isFilterExpired;
+ }
+
+ public static String dateToStr(Date date) {
+ if (date == null) {
+ return "";
+ }
+ SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
+ TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
+ formatter.setTimeZone(timeZone);
+ return formatter.format(date);
+ }
+
+ public static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) {
+ String expiryTime = vLogfeederFilter.getExpiryTime();
+ if (expiryTime != null && !expiryTime.isEmpty()) {
+ SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
+ TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
+ formatter.setTimeZone(timeZone);
+ try {
+ return formatter.parse(expiryTime);
+ } catch (ParseException e) {
+ logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
+ + " and hosts :" + listToStr(vLogfeederFilter.getHosts()));
+ }
+ }
+ return null;
+ }
+
+ public static List<String> getAllowedLevels(String hostName, VLogfeederFilter componentFilter) {
+ String componentName = componentFilter.getLabel();
+ List<String> hosts = componentFilter.getHosts();
+ List<String> defaultLevels = componentFilter.getDefaultLevels();
+ List<String> overrideLevels = componentFilter.getOverrideLevels();
+ if (LogFeederUtil.isListContains(hosts, hostName, false)) {
+ if (isFilterExpired(componentFilter)) {
+ // pick default
+ logger.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at "
+ + componentFilter.getExpiryTime());
+ return defaultLevels;
+ } else {
+ // return tmp filter levels
+ return overrideLevels;
+ }
+ } else {
+ return defaultLevels;
+ }
+ }
+
+ public static VLogfeederFilter findComponentFilter(String componentName) {
+ if (logfeederFilterWrapper != null) {
+ HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
+ if (filter != null) {
+ VLogfeederFilter componentFilter = filter.get(componentName);
+ if (componentFilter != null) {
+ return componentFilter;
+ }
+ }
+ }
+ logger.trace("Filter is not there for component :" + componentName);
+ return null;
+ }
+
+
+ public static Date getCurrentDate() {
+ TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone));
+ Date date = new Date();
+ return date;
+ }
+
+ public static String listToStr(List<String> strList) {
+ StringBuilder out = new StringBuilder("[");
+ if (strList != null) {
+ int counter = 0;
+ for (Object o : strList) {
+ if (counter > 0) {
+ out.append(",");
+ }
+ out.append(o.toString());
+ counter++;
+ }
+ }
+ out.append("]");
+ return out.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
new file mode 100644
index 0000000..f61dc1b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+public class LogFeederConstants {
+
+ public static final String ALL = "all";
+ public static final String NAME = "log_feeder_config";
+ // solr fields
+ public static final String SOLR_LEVEL = "level";
+ public static final String SOLR_COMPONENT = "type";
+ public static final String SOLR_HOST = "host";
+
+ //
+ // UserConfig Constants History
+ public static final String ID = "id";
+ public static final String USER_NAME = "username";
+ public static final String VALUES = "jsons";
+ public static final String FILTER_NAME = "filtername";
+ public static final String ROW_TYPE = "rowtype";
+
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
new file mode 100644
index 0000000..7525dff
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.log4j.Logger;
+
+public enum LogfeederScheduler {
+
+ INSTANCE;
+
+ private Logger logger = Logger.getLogger(LogfeederScheduler.class);
+
+ private static boolean running = false;
+
+ public synchronized void start() {
+ boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+ if (!filterEnable) {
+ logger.info("Logfeeder filter Scheduler is disabled.");
+ return;
+ }
+ if (!running) {
+ for (Thread thread : getThreadList()) {
+ thread.start();
+ }
+ running = true;
+ logger.info("Logfeeder Scheduler started!");
+ } else {
+ logger.warn("Logfeeder Scheduler is already running.");
+ }
+ }
+
+ private List<Thread> getThreadList() {
+ List<Thread> tasks = new ArrayList<Thread>();
+ tasks.add(new FetchConfigFromSolr());
+ return tasks;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
new file mode 100644
index 0000000..3748445
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.view.VLogfeederFilter;
+import org.apache.log4j.Logger;
+
+public class ApplyLogFilter extends DefaultDataFilter {
+
+ private static Logger logger = Logger.getLogger(ApplyLogFilter.class);
+
+ @Override
+ public boolean applyFilter(Map<String, Object> jsonObj, boolean defaultValue) {
+ if (isEmpty(jsonObj)) {
+ logger.warn("Output jsonobj is empty");
+ return defaultValue;
+ }
+ String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+ if (isNotEmpty(hostName)) {
+ String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+ if (isNotEmpty(componentName)) {
+ String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+ if (isNotEmpty(level)) {
+ // find component filter
+ VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName);
+ if (componentFilter == null) {
+ //return default value if there is no filter found for particular component
+ return defaultValue;
+ }
+ List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels(hostName, componentFilter);
+ return LogFeederUtil.isListContains(allowedLevels, level, false);
+ }
+ }
+ }
+ return defaultValue;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
new file mode 100644
index 0000000..9e98c6a
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Default filter to allow everything
+ */
+public class DefaultDataFilter {
+ private static Logger logger = Logger.getLogger(DefaultDataFilter.class);
+
+ protected static final boolean CASE_SENSITIVE = false;
+
+ public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) {
+ return defaultValue;
+ }
+
+ public boolean isEmpty(Map<String, Object> map) {
+ if (map == null || map.isEmpty()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isEmpty(String str) {
+ if (str == null || str.trim().isEmpty()) {
+ return true;
+ }
+ return false;
+ }
+
+ public boolean isNotEmpty(String str) {
+ return !isEmpty(str);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
new file mode 100644
index 0000000..643df98
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig.filter;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
+import org.apache.log4j.Logger;
+
+/**
+ * Read configuration from solr and filter the log
+ */
+public enum FilterLogData {
+ INSTANCE;
+ private ApplyLogFilter applyLogFilter = new ApplyLogFilter();
+ private static Logger logger = Logger.getLogger(FilterLogData.class);
+ // by default allow every log
+ boolean defaultValue = true;
+
+ public boolean isAllowed(String jsonBlock) {
+ if (jsonBlock == null || jsonBlock.isEmpty()) {
+ return defaultValue;
+ }
+ Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+ return applyLogFilter.applyFilter(jsonObj, defaultValue);
+ }
+
+ public boolean isAllowed(Map<String, Object> jsonObj) {
+ boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue);
+ if (!isAllowed) {
+ logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
+ }
+ return isAllowed;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
new file mode 100644
index 0000000..5b89d4b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+public abstract class Mapper {
+ String inputDesc;
+ String fieldName;
+ String mapClassCode;
+
+ @SuppressWarnings("hiding")
+ public boolean init(String inputDesc, String fieldName,
+ String mapClassCode, Object mapConfigs) {
+ this.inputDesc = inputDesc;
+ this.fieldName = fieldName;
+ this.mapClassCode = mapClassCode;
+ return true;
+ }
+
+ /**
+ * @param value
+ * @return
+ */
+ public Object apply(Map<String, Object> jsonObj, Object value) {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return "mapClass=" + mapClassCode + ", input=" + inputDesc
+ + ", fieldName=" + fieldName;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
new file mode 100644
index 0000000..107e7e4
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class MapperDate extends Mapper {
+ Logger logger = Logger.getLogger(MapperDate.class);
+
+ String dateFormat = null;
+ SimpleDateFormat dateFormatter = null;
+ boolean isEpoch = false;
+
+ @SuppressWarnings("hiding")
+ @Override
+ public boolean init(String inputDesc, String fieldName,
+ String mapClassCode, Object mapConfigs) {
+ super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ if (!(mapConfigs instanceof Map)) {
+ logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+ + mapConfigs.getClass().getName()
+ + ", map="
+ + this.toString());
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+ dateFormat = (String) mapObjects.get("date_pattern");
+ if (StringUtils.isEmpty(dateFormat)) {
+ logger.fatal("Date format for map is empty. " + this.toString());
+ } else {
+ logger.info("Date mapper format is " + dateFormat);
+
+ if (dateFormat.equalsIgnoreCase("epoch")) {
+ isEpoch = true;
+ return true;
+ } else {
+ try {
+ dateFormatter = new SimpleDateFormat(dateFormat);
+ return true;
+ } catch (Throwable ex) {
+ logger.fatal("Error creating date format. format="
+ + dateFormat + ". " + this.toString());
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Object apply(Map<String, Object> jsonObj, Object value) {
+ if (value != null) {
+ try {
+ if (isEpoch) {
+ // First convert to long
+ long ms = Long.parseLong(value.toString()) * 1000;
+ value = new Date(ms);
+ } else if (dateFormatter != null) {
+ value = dateFormatter.parse(value.toString());
+ } else {
+ return value;
+ }
+ jsonObj.put(fieldName, value);
+ } catch (Throwable t) {
+ LogFeederUtil.logErrorMessageByInterval(this.getClass()
+ .getSimpleName() + ":apply",
+ "Error applying date transformation. isEpoch="
+ + isEpoch + ", dateFormat=" + dateFormat
+ + ", value=" + value + ". " + this.toString(),
+ t, logger, Level.ERROR);
+ }
+ }
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
new file mode 100644
index 0000000..99c33ed
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Overrides the value for the field
+ */
+public class MapperFieldName extends Mapper {
+ Logger logger = Logger.getLogger(MapperFieldName.class);
+ String newValue = null;
+
+ @SuppressWarnings("hiding")
+ @Override
+ public boolean init(String inputDesc, String fieldName,
+ String mapClassCode, Object mapConfigs) {
+ super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ if (!(mapConfigs instanceof Map)) {
+ logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+ + mapConfigs.getClass().getName());
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+ newValue = (String) mapObjects.get("new_fieldname");
+ if (StringUtils.isEmpty(newValue)) {
+ logger.fatal("Map field value is empty.");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Object apply(Map<String, Object> jsonObj, Object value) {
+ if (newValue != null) {
+ // Remove the old one
+ jsonObj.remove(fieldName);
+ // Add with new key name
+ jsonObj.put(newValue, value);
+ } else {
+ LogFeederUtil.logErrorMessageByInterval(this.getClass()
+ .getSimpleName() + ":apply",
+ "New fieldName is null, so transformation is not applied. "
+ + this.toString(), null, logger, Level.ERROR);
+ }
+ return value;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
new file mode 100644
index 0000000..9810ceb
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Overrides the value for the field
+ */
+public class MapperFieldValue extends Mapper {
+ Logger logger = Logger.getLogger(MapperFieldValue.class);
+ String prevValue = null;
+ String newValue = null;
+
+ @SuppressWarnings("hiding")
+ @Override
+ public boolean init(String inputDesc, String fieldName,
+ String mapClassCode, Object mapConfigs) {
+ super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ if (!(mapConfigs instanceof Map)) {
+ logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
+ + mapConfigs.getClass().getName());
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
+ prevValue = (String) mapObjects.get("pre_value");
+ newValue = (String) mapObjects.get("post_value");
+ if (StringUtils.isEmpty(newValue)) {
+ logger.fatal("Map field value is empty.");
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public Object apply(Map<String, Object> jsonObj, Object value) {
+ if (newValue != null) {
+ if (prevValue != null) {
+ if (prevValue.equalsIgnoreCase(value.toString())) {
+ value = newValue;
+ jsonObj.put(fieldName, value);
+ }
+ }
+ } else {
+ LogFeederUtil.logErrorMessageByInterval(
+ this.getClass().getSimpleName() + ":apply",
+ "New value is null, so transformation is not applied. "
+ + this.toString(), null, logger, Level.ERROR);
+ }
+ return value;
+ }
+
+}