You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/11 18:15:03 UTC
[49/51] [partial] ambari git commit: AMBARI-15679. Initial commit for
LogSearch module (oleewre)
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
new file mode 100644
index 0000000..445c294
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static java.nio.file.StandardWatchEventKinds.*;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputFile;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputMgr {
+ static Logger logger = Logger.getLogger(InputMgr.class);
+
+ List<Input> inputList = new ArrayList<Input>();
+ Set<Input> notReadyList = new HashSet<Input>();
+
+ WatchService folderWatcher = null;
+ Set<File> foldersToMonitor = new HashSet<File>();
+ Map<String, Input> filesToMonitor = new HashMap<String, Input>();
+ boolean isDrain = false;
+ boolean isAnyInputTail = false;
+
+ private String checkPointSubFolderName = "logfeeder_checkpoints";
+ File checkPointFolderFile = null;
+
+ MetricCount filesCountMetric = new MetricCount();
+
+ private String checkPointExtension = ".cp";
+
+ public List<Input> getInputList() {
+ return inputList;
+ }
+
+ public void add(Input input) {
+ inputList.add(input);
+ }
+
+ /**
+ * @param input
+ */
+ public void removeInput(Input input) {
+ logger.info("Trying to remove from inputList. "
+ + input.getShortDescription());
+ Iterator<Input> iter = inputList.iterator();
+ while (iter.hasNext()) {
+ Input iterInput = iter.next();
+ if (iterInput.equals(input)) {
+ logger.info("Removing Input from inputList. "
+ + input.getShortDescription());
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * @return
+ */
+ public int getActiveFilesCount() {
+ int count = 0;
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public void init() {
+ filesCountMetric.metricsName = "input.files.count";
+ filesCountMetric.isPointInTime = true;
+
+ checkPointExtension = LogFeederUtil.getStringProperty(
+ "logfeeder.checkpoint.extension", checkPointExtension);
+ for (Input input : inputList) {
+ try {
+ input.init();
+ if (input.isTail()) {
+ isAnyInputTail = true;
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Error initializing input. "
+ + input.getShortDescription(), e);
+ }
+ }
+
+ if (isAnyInputTail) {
+ logger.info("Determining valid checkpoint folder");
+ boolean isCheckPointFolderValid = false;
+ // We need to keep track of the files we are reading.
+ String checkPointFolder = LogFeederUtil
+ .getStringProperty("logfeeder.checkpoint.folder");
+ if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
+ checkPointFolderFile = new File(checkPointFolder);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's try home folder
+ String userHome = LogFeederUtil.getStringProperty("user.home");
+ if (userHome != null) {
+ checkPointFolderFile = new File(userHome,
+ checkPointSubFolderName);
+ logger.info("Checking if home folder can be used for checkpoints. Folder="
+ + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's use tmp folder
+ String tmpFolder = LogFeederUtil
+ .getStringProperty("java.io.tmpdir");
+ if (tmpFolder == null) {
+ tmpFolder = "/tmp";
+ }
+ checkPointFolderFile = new File(tmpFolder,
+ checkPointSubFolderName);
+ logger.info("Checking if tmps folder can be used for checkpoints. Folder="
+ + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ if (isCheckPointFolderValid) {
+ logger.warn("Using tmp folder "
+ + checkPointFolderFile
+ + " to store check points. This is not recommended."
+ + "Please set logfeeder.checkpoint.folder property");
+ }
+ }
+
+ if (isCheckPointFolderValid) {
+ logger.warn("Using folder " + checkPointFolderFile
+ + " for storing checkpoints");
+ }
+ }
+
+ }
+
+ public File getCheckPointFolderFile() {
+ return checkPointFolderFile;
+ }
+
+ boolean verifyCheckPointFolder(File folderPathFile) {
+ if (!folderPathFile.exists()) {
+ // Create the folder
+ try {
+ if (!folderPathFile.mkdir()) {
+ logger.warn("Error creating folder for check point. folder="
+ + folderPathFile);
+ }
+ } catch (Throwable t) {
+ logger.warn("Error creating folder for check point. folder="
+ + folderPathFile, t);
+ }
+ }
+
+ if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+ // Let's check whether we can create a file
+ File testFile = new File(folderPathFile, UUID.randomUUID()
+ .toString());
+ try {
+ testFile.createNewFile();
+ return testFile.delete();
+ } catch (IOException e) {
+ logger.warn(
+ "Couldn't create test file in "
+ + folderPathFile.getAbsolutePath()
+ + " for checkPoint", e);
+ }
+ }
+ return false;
+ }
+
+ public void monitor() {
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ input.monitor();
+ } else {
+ if (input.isTail()) {
+ logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
+ + input.getShortDescription());
+ notReadyList.add(input);
+ } else {
+ logger.info("Input is not ready, so going to ignore it "
+ + input.getShortDescription());
+ }
+ }
+ }
+ // Start the monitoring thread if any file is in tail mode
+ if (isAnyInputTail) {
+ Thread monitorThread = new Thread("InputIsReadyMonitor") {
+ @Override
+ public void run() {
+ logger.info("Going to monitor for these missing files: "
+ + notReadyList.toString());
+ while (true) {
+ if (isDrain) {
+ logger.info("Exiting missing file monitor.");
+ break;
+ }
+ try {
+ Iterator<Input> iter = notReadyList.iterator();
+ while (iter.hasNext()) {
+ Input input = iter.next();
+ try {
+ if (input.isReady()) {
+ input.monitor();
+ iter.remove();
+ }
+ } catch (Throwable t) {
+ logger.error("Error while enabling monitoring for input. "
+ + input.getShortDescription());
+ }
+ }
+ Thread.sleep(30 * 1000);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ };
+ monitorThread.start();
+ }
+ }
+
+ public void addToNotReady(Input notReadyInput) {
+ notReadyList.add(notReadyInput);
+ }
+
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ for (Input input : inputList) {
+ input.addMetricsContainers(metricsList);
+ }
+ filesCountMetric.count = getActiveFilesCount();
+ metricsList.add(filesCountMetric);
+ }
+
+ /**
+ *
+ */
+ public void logStats() {
+ for (Input input : inputList) {
+ input.logStat();
+ }
+
+ filesCountMetric.count = getActiveFilesCount();
+ LogFeederUtil.logStatForMetric(filesCountMetric,
+ "Stat: Files Monitored Count", null);
+ }
+
+ public void close() {
+ for (Input input : inputList) {
+ try {
+ input.setDrain(true);
+ } catch (Throwable t) {
+ logger.error(
+ "Error while draining. input="
+ + input.getShortDescription(), t);
+ }
+ }
+ isDrain = true;
+
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ int i = 0;
+ boolean allClosed = true;
+ for (i = 0; i < iterations; i++) {
+ allClosed = true;
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ try {
+ allClosed = false;
+ logger.warn("Waiting for input to close. "
+ + input.getShortDescription() + ", "
+ + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ break;
+ }
+ }
+ if (!allClosed) {
+ logger.warn("Some inputs were not closed. Iterations=" + i);
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ logger.warn("Input not closed. Will ignore it."
+ + input.getShortDescription());
+ }
+ }
+ } else {
+ logger.info("All inputs are closed. Iterations=" + i);
+ }
+
+ }
+
+ public void checkInAll() {
+ for (Input input : inputList) {
+ input.checkIn();
+ }
+ }
+
+ public void cleanCheckPointFiles() {
+
+ if (checkPointFolderFile == null) {
+ logger.info("Will not clean checkPoint files. checkPointFolderFile="
+ + checkPointFolderFile);
+ return;
+ }
+ logger.info("Cleaning checkPoint files. checkPointFolderFile="
+ + checkPointFolderFile.getAbsolutePath());
+ try {
+ // Loop over the check point files and if filePath is not present,
+ // then
+ // move to closed
+ String searchPath = "*" + checkPointExtension;
+ FileFilter fileFilter = new WildcardFileFilter(searchPath);
+ File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+ int totalCheckFilesDeleted = 0;
+ for (File checkPointFile : checkPointFiles) {
+ RandomAccessFile checkPointReader = null;
+ try {
+ checkPointReader = new RandomAccessFile(checkPointFile, "r");
+
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+ + contentSize
+ + ", read="
+ + readSize
+ + ", checkPointFile=" + checkPointFile);
+ } else {
+ // Create JSON string
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Map<String, Object> jsonCheckPoint = LogFeederUtil
+ .toJSONObject(jsonCheckPointStr);
+
+ String logFilePath = (String) jsonCheckPoint
+ .get("file_path");
+ String logFileKey = (String) jsonCheckPoint
+ .get("file_key");
+ if (logFilePath != null && logFileKey != null) {
+ boolean deleteCheckPointFile = false;
+ File logFile = new File(logFilePath);
+ if (logFile.exists()) {
+ Object fileKeyObj = InputFile
+ .getFileKey(logFile);
+ String fileBase64 = Base64
+ .byteArrayToBase64(fileKeyObj
+ .toString().getBytes());
+ if (!logFileKey.equals(fileBase64)) {
+ deleteCheckPointFile = true;
+ logger.info("CheckPoint clean: File key has changed. old="
+ + logFileKey
+ + ", new="
+ + fileBase64
+ + ", filePath="
+ + logFilePath
+ + ", checkPointFile="
+ + checkPointFile.getAbsolutePath());
+ }
+ } else {
+ logger.info("CheckPoint clean: Log file doesn't exist. filePath="
+ + logFilePath
+ + ", checkPointFile="
+ + checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = true;
+ }
+ if (deleteCheckPointFile) {
+ logger.info("Deleting CheckPoint file="
+ + checkPointFile.getAbsolutePath()
+ + ", logFile=" + logFilePath);
+ checkPointFile.delete();
+ totalCheckFilesDeleted++;
+ }
+ }
+ }
+ } catch (EOFException eof) {
+ logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
+ + checkPointFile);
+ } catch (Throwable t) {
+ logger.error("Error while checking checkPoint file. "
+ + checkPointFile, t);
+ } finally {
+ if (checkPointReader != null) {
+ try {
+ checkPointReader.close();
+ } catch (Throwable t) {
+ logger.error("Error closing checkPoint file. "
+ + checkPointFile, t);
+ }
+ }
+ }
+ }
+ logger.info("Deleted " + totalCheckFilesDeleted
+ + " checkPoint file(s). checkPointFolderFile="
+ + checkPointFolderFile.getAbsolutePath());
+
+ } catch (Throwable t) {
+ logger.error("Error while cleaning checkPointFiles", t);
+ }
+ }
+
+ synchronized public void monitorSystemFileChanges(Input inputToMonitor) {
+ try {
+ File fileToMonitor = new File(inputToMonitor.getFilePath());
+ if (filesToMonitor.containsKey(fileToMonitor.getAbsolutePath())) {
+ logger.info("Already monitoring file " + fileToMonitor
+ + ". So ignoring this request");
+ return;
+ }
+
+ // make a new watch service that we can register interest in
+ // directories and files with.
+ if (folderWatcher == null) {
+ folderWatcher = FileSystems.getDefault().newWatchService();
+ // start the file watcher thread below
+ Thread th = new Thread(new FileSystemMonitor(),
+ "FileSystemWatcher");
+ th.setDaemon(true);
+ th.start();
+
+ }
+ File folderToWatch = fileToMonitor.getParentFile();
+ if (folderToWatch != null) {
+ if (foldersToMonitor.contains(folderToWatch.getAbsolutePath())) {
+ logger.info("Already monitoring folder " + folderToWatch
+ + ". So ignoring this request.");
+ } else {
+ logger.info("Configuring to monitor folder "
+ + folderToWatch + " for file " + fileToMonitor);
+ // get the directory we want to watch, using the Paths
+ // singleton
+ // class
+ Path toWatch = Paths.get(folderToWatch.getAbsolutePath());
+ if (toWatch == null) {
+ throw new UnsupportedOperationException(
+ "Directory not found. folder=" + folderToWatch);
+ }
+
+ toWatch.register(folderWatcher, ENTRY_CREATE);
+ foldersToMonitor.add(folderToWatch);
+ }
+ filesToMonitor.put(fileToMonitor.getAbsolutePath(),
+ inputToMonitor);
+ } else {
+ logger.error("File doesn't have parent folder." + fileToMonitor);
+ }
+ } catch (IOException e) {
+ logger.error("Error while trying to set watcher for file:"
+ + inputToMonitor);
+ }
+
+ }
+
+ class FileSystemMonitor implements Runnable {
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+ try {
+ // get the first event before looping
+ WatchKey key = folderWatcher.take();
+ while (key != null) {
+ Path dir = (Path) key.watchable();
+ // we have a polled event, now we traverse it and
+ // receive all the states from it
+ for (WatchEvent<?> event : key.pollEvents()) {
+ if (!event.kind().equals(ENTRY_CREATE)) {
+ logger.info("Ignoring event.kind=" + event.kind());
+ continue;
+ }
+ logger.info("Received " + event.kind()
+ + " event for file " + event.context());
+
+ File newFile = new File(dir.toFile(), event.context()
+ .toString());
+ Input rolledOverInput = filesToMonitor.get(newFile
+ .getAbsolutePath());
+ if (rolledOverInput == null) {
+ logger.info("Input not found for file " + newFile);
+ } else {
+ rolledOverInput.rollOver();
+ }
+ }
+ if (!key.reset()) {
+ logger.error("Error while key.reset(). Will have to abort watching files. Rollover will not work.");
+ break;
+ }
+ key = folderWatcher.take();
+ }
+ } catch (InterruptedException e) {
+ logger.info("Stop request for thread");
+ }
+ logger.info("Exiting FileSystemMonitor thread.");
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
new file mode 100644
index 0000000..f4ca51b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.logconfig.LogfeederScheduler;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+import com.google.gson.reflect.TypeToken;
+
+public class LogFeeder {
+ static Logger logger = Logger.getLogger(LogFeeder.class);
+
+ // List<Input> inputList = new ArrayList<Input>();
+ Collection<Output> outputList = new ArrayList<Output>();
+
+ OutputMgr outMgr = new OutputMgr();
+ InputMgr inputMgr = new InputMgr();
+ MetricsMgr metricsMgr = new MetricsMgr();
+
+ Map<String, Object> globalMap = null;
+ String[] inputParams;
+
+ List<Map<String, Object>> globalConfigList = new ArrayList<Map<String, Object>>();
+ List<Map<String, Object>> inputConfigList = new ArrayList<Map<String, Object>>();
+ List<Map<String, Object>> filterConfigList = new ArrayList<Map<String, Object>>();
+ List<Map<String, Object>> outputConfigList = new ArrayList<Map<String, Object>>();
+
+ int checkPointCleanIntervalMS = 24 * 60 * 60 * 60 * 1000; // 24 hours
+ long lastCheckPointCleanedMS = 0;
+
+ public LogFeeder(String[] args) {
+ inputParams = args;
+ }
+
+ public void init() throws Throwable {
+
+ // Load properties
+ LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
+
+ // loop the properties and load them
+ // Load the configs
+ String configFiles = LogFeederUtil.getStringProperty("config.files");
+ if (configFiles == null) {
+ configFiles = LogFeederUtil.getStringProperty("config.file",
+ "config.json");
+ }
+ logger.info("config.files=" + configFiles);
+ String[] configFileList = configFiles.split(",");
+ for (String configFileName : configFileList) {
+ logger.info("Going to load config file:" + configFileName);
+ File configFile = new File(configFileName);
+ if (configFile.exists() && configFile.isFile()) {
+ logger.info("Config file exists in path."
+ + configFile.getAbsolutePath());
+ loadConfigsUsingFile(configFile);
+ } else {
+ // Let's try to load it from class loader
+ logger.info("Trying to load config file from classloader: "
+ + configFileName);
+ laodConfigsUsingClassLoader(configFileName);
+ logger.info("Loaded config file from classloader: "
+ + configFileName);
+ }
+ }
+ mergeAllConfigs();
+ outMgr.setOutputList(outputList);
+ for (Output output : outputList) {
+ output.init();
+ }
+ inputMgr.init();
+ metricsMgr.init();
+ //starting timer to fetch config from solr
+ LogfeederScheduler.INSTANCE.start();
+ logger.debug("==============");
+ }
+
+ void laodConfigsUsingClassLoader(String configFileName) throws Exception {
+ BufferedInputStream fileInputStream = (BufferedInputStream) this
+ .getClass().getClassLoader()
+ .getResourceAsStream(configFileName);
+ if (fileInputStream != null) {
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ fileInputStream));
+ String configData = readFile(br);
+ loadConfigs(configData);
+ } else {
+ throw new Exception("Can't find configFile=" + configFileName);
+ }
+ }
+
+ /**
+ * This method loads the configurations from the given file.
+ *
+ * @param configFile
+ * @return
+ * @throws Exception
+ */
+ void loadConfigsUsingFile(File configFile) throws Exception {
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(configFile);
+ BufferedReader br = new BufferedReader(new InputStreamReader(
+ fileInputStream));
+ String configData = readFile(br);
+ loadConfigs(configData);
+ } catch (Exception t) {
+ logger.error("Error opening config file. configFilePath="
+ + configFile.getAbsolutePath());
+ throw t;
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (Throwable t) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void loadConfigs(String configData) throws Exception {
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ Map<String, Object> configMap = LogFeederUtil.getGson().fromJson(
+ configData, type);
+
+ // Get the globals
+ for (String key : configMap.keySet()) {
+ if (key.equalsIgnoreCase("global")) {
+ globalConfigList.add((Map<String, Object>) configMap.get(key));
+ } else if (key.equalsIgnoreCase("input")) {
+ List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+ .get(key);
+ inputConfigList.addAll(mapList);
+ } else if (key.equalsIgnoreCase("filter")) {
+ List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+ .get(key);
+ filterConfigList.addAll(mapList);
+ } else if (key.equalsIgnoreCase("output")) {
+ List<Map<String, Object>> mapList = (List<Map<String, Object>>) configMap
+ .get(key);
+ outputConfigList.addAll(mapList);
+ }
+ }
+
+ }
+
+ /**
+ *
+ */
+ private void mergeAllConfigs() {
+ globalMap = mergeConfigs(globalConfigList);
+
+ // Sort the filter blocks
+ sortBlocks(filterConfigList);
+ // First loop for output
+ for (Map<String, Object> map : outputConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalMap, map);
+
+ String value = (String) map.get("destination");
+ Output output;
+ if (value == null || value.isEmpty()) {
+ logger.error("Output block doesn't have destination element");
+ continue;
+ }
+ String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.OUTPUT, ALIAS_PARAM.KLASS);
+ if (classFullName == null || classFullName.isEmpty()) {
+ logger.error("Destination block doesn't have output element");
+ continue;
+ }
+ output = (Output) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.OUTPUT);
+
+ if (output == null) {
+ logger.error("Destination Object is null");
+ continue;
+ }
+
+ output.setDestination(value);
+ output.loadConfig(map);
+
+ // We will only check for is_enabled out here. Down below we will
+ // check whether this output is enabled for the input
+ boolean isEnabled = output.getBooleanValue("is_enabled", true);
+ if (isEnabled) {
+ outputList.add(output);
+ output.logConfgs(Level.INFO);
+ } else {
+ logger.info("Output is disabled. So ignoring it. "
+ + output.getShortDescription());
+ }
+ }
+
+ // Second loop for input
+ for (Map<String, Object> map : inputConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalMap, map);
+
+ String value = (String) map.get("source");
+ Input input;
+ if (value == null || value.isEmpty()) {
+ logger.error("Input block doesn't have source element");
+ continue;
+ }
+ String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.INPUT, ALIAS_PARAM.KLASS);
+ if (classFullName == null || classFullName.isEmpty()) {
+ logger.error("Source block doesn't have source element");
+ continue;
+ }
+ input = (Input) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.INPUT);
+
+ if (input == null) {
+ logger.error("Source Object is null");
+ continue;
+ }
+
+ input.setType(value);
+ input.loadConfig(map);
+
+ if (input.isEnabled()) {
+ input.setOutputMgr(outMgr);
+ input.setInputMgr(inputMgr);
+ inputMgr.add(input);
+ input.logConfgs(Level.INFO);
+ } else {
+ logger.info("Input is disabled. So ignoring it. "
+ + input.getShortDescription());
+ }
+ }
+
+ // Third loop is for filter, but we will have to create a filter
+ // instance for each input, so it can maintain the state per input
+ List<Input> toRemoveInputList = new ArrayList<Input>();
+ for (Input input : inputMgr.getInputList()) {
+ Filter prevFilter = null;
+ for (Map<String, Object> map : filterConfigList) {
+ if (map == null) {
+ continue;
+ }
+ mergeBlocks(globalMap, map);
+
+ String value = (String) map.get("filter");
+ Filter filter;
+ if (value == null || value.isEmpty()) {
+ logger.error("Filter block doesn't have filter element");
+ continue;
+ }
+
+ String classFullName = AliasUtil.getInstance().readAlias(value, ALIAS_TYPE.FILTER, ALIAS_PARAM.KLASS);
+ if (classFullName == null || classFullName.isEmpty()) {
+ logger.error("Filter block doesn't have filter element");
+ continue;
+ }
+ filter = (Filter) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.FILTER);
+
+ if (filter == null) {
+ logger.error("Filter Object is null");
+ continue;
+ }
+ filter.loadConfig(map);
+ filter.setInput(input);
+
+ if (filter.isEnabled()) {
+ filter.setOutputMgr(outMgr);
+ if (prevFilter == null) {
+ input.setFirstFilter(filter);
+ } else {
+ prevFilter.setNextFilter(filter);
+ }
+ prevFilter = filter;
+ filter.logConfgs(Level.INFO);
+ } else {
+ logger.debug("Ignoring filter "
+ + filter.getShortDescription() + " for input "
+ + input.getShortDescription());
+ }
+ }
+ if (input.getFirstFilter() == null) {
+ toRemoveInputList.add(input);
+ }
+ }
+
+ // Fourth loop is for associating valid outputs to input
+ Set<Output> usedOutputSet = new HashSet<Output>();
+ for (Input input : inputMgr.getInputList()) {
+ for (Output output : outputList) {
+ boolean ret = LogFeederUtil.isEnabled(output.getConfigs(),
+ input.getConfigs());
+ if (ret) {
+ usedOutputSet.add(output);
+ input.addOutput(output);
+ }
+ }
+ }
+ outputList = usedOutputSet;
+
+ for (Input toRemoveInput : toRemoveInputList) {
+ logger.warn("There are no filters, we will ignore this input. "
+ + toRemoveInput.getShortDescription());
+ inputMgr.removeInput(toRemoveInput);
+ }
+ }
+
+ /**
+ * @param filterConfigList2
+ * @return
+ */
+ private void sortBlocks(List<Map<String, Object>> blockList) {
+
+ Collections.sort(blockList, new Comparator<Map<String, Object>>() {
+
+ @Override
+ public int compare(Map<String, Object> o1, Map<String, Object> o2) {
+ Object o1Sort = o1.get("sort_order");
+ Object o2Sort = o2.get("sort_order");
+ if (o1Sort == null) {
+ return 0;
+ }
+ if (o2Sort == null) {
+ return 0;
+ }
+ int o1Value = 0;
+ if (!(o1Sort instanceof Number)) {
+ try {
+ o1Value = (new Double(Double.parseDouble(o1Sort
+ .toString()))).intValue();
+ } catch (Throwable t) {
+ logger.error("Value is not of type Number. class="
+ + o1Sort.getClass().getName() + ", value="
+ + o1Sort.toString() + ", map=" + o1.toString());
+ }
+ } else {
+ o1Value = ((Number) o1Sort).intValue();
+ }
+ int o2Value = 0;
+ if (!(o2Sort instanceof Integer)) {
+ try {
+ o2Value = (new Double(Double.parseDouble(o2Sort
+ .toString()))).intValue();
+ } catch (Throwable t) {
+ logger.error("Value is not of type Number. class="
+ + o2Sort.getClass().getName() + ", value="
+ + o2Sort.toString() + ", map=" + o2.toString());
+ }
+ } else {
+
+ }
+ return o1Value - o2Value;
+ }
+ });
+ }
+
+ /**
+ * @param globalConfigList2
+ */
+ private Map<String, Object> mergeConfigs(
+ List<Map<String, Object>> configList) {
+ Map<String, Object> mergedConfig = new HashMap<String, Object>();
+ for (Map<String, Object> config : configList) {
+ mergeBlocks(config, mergedConfig);
+ }
+ return mergedConfig;
+ }
+
+ private void mergeBlocks(Map<String, Object> fromMap,
+ Map<String, Object> toMap) {
+ // Merge the non-string
+ for (String key : fromMap.keySet()) {
+ Object objValue = fromMap.get(key);
+ if (objValue == null) {
+ continue;
+ }
+ if (objValue instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> globalFields = LogFeederUtil
+ .cloneObject((Map<String, Object>) fromMap.get(key));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> localFields = (Map<String, Object>) toMap
+ .get(key);
+ if (localFields == null) {
+ localFields = new HashMap<String, Object>();
+ toMap.put(key, localFields);
+ }
+
+ if (globalFields != null) {
+ for (String fieldKey : globalFields.keySet()) {
+ if (!localFields.containsKey(fieldKey)) {
+ localFields.put(fieldKey,
+ globalFields.get(fieldKey));
+ }
+ }
+ }
+ }
+ }
+
+ // Let's add the rest of the top level fields if missing
+ for (String key : fromMap.keySet()) {
+ if (!toMap.containsKey(key)) {
+ toMap.put(key, fromMap.get(key));
+ }
+ }
+ }
+
+ private void monitor() throws Exception {
+ inputMgr.monitor();
+ Runtime.getRuntime().addShutdownHook(new JVMShutdownHook());
+
+ Thread statLogger = new Thread("statLogger") {
+
+ @Override
+ public void run() {
+ while (true) {
+ try {
+ Thread.sleep(30 * 1000);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ try {
+ logStats();
+ } catch (Throwable t) {
+ logger.error(
+ "LogStats: Caught exception while logging stats.",
+ t);
+ }
+
+ if (System.currentTimeMillis() > (lastCheckPointCleanedMS + checkPointCleanIntervalMS)) {
+ lastCheckPointCleanedMS = System.currentTimeMillis();
+ inputMgr.cleanCheckPointFiles();
+ }
+ }
+ }
+
+ };
+ statLogger.setDaemon(true);
+ statLogger.start();
+
+ }
+
+ private void logStats() {
+ inputMgr.logStats();
+ outMgr.logStats();
+
+ if (metricsMgr.isMetricsEnabled()) {
+ List<MetricCount> metricsList = new ArrayList<MetricCount>();
+ inputMgr.addMetricsContainers(metricsList);
+ outMgr.addMetricsContainers(metricsList);
+ metricsMgr.useMetrics(metricsList);
+ }
+ }
+
+ /**
+ * @param inFile
+ * @return
+ * @throws Throwable
+ */
+ public String readFile(BufferedReader br) throws Exception {
+ try {
+ StringBuilder sb = new StringBuilder();
+ String line = br.readLine();
+ while (line != null) {
+ sb.append(line);
+ line = br.readLine();
+ }
+ return sb.toString();
+ } catch (Exception t) {
+ logger.error("Error loading properties file.", t);
+ throw t;
+ }
+ }
+
+ public Collection<Output> getOutputList() {
+ return outputList;
+ }
+
+ public OutputMgr getOutMgr() {
+ return outMgr;
+ }
+
+ public static void main(String[] args) {
+ LogFeeder logFeeder = new LogFeeder(args);
+ logFeeder.run(logFeeder);
+ }
+
+
+ public static void run(String[] args) {
+ LogFeeder logFeeder = new LogFeeder(args);
+ logFeeder.run(logFeeder);
+ }
+
+ public void run(LogFeeder logFeeder) {
+ try {
+ Date startTime = new Date();
+ logFeeder.init();
+ Date endTime = new Date();
+ logger.info("Took " + (endTime.getTime() - startTime.getTime())
+ + " ms to initialize");
+ logFeeder.monitor();
+
+ } catch (Throwable t) {
+ logger.fatal("Caught exception in main.", t);
+ System.exit(1);
+ }
+ }
+
+ private class JVMShutdownHook extends Thread {
+
+ public void run() {
+ try {
+ logger.info("Processing is shutting down.");
+
+ inputMgr.close();
+ outMgr.close();
+ inputMgr.checkInAll();
+
+ logStats();
+
+ logger.info("LogSearch is exiting.");
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
new file mode 100644
index 0000000..e53a227
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
+ static Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+
+ String collectorHosts = null;
+
+ public LogFeederAMSClient() {
+ collectorHosts = LogFeederUtil
+ .getStringProperty("metrics.collector.hosts");
+ if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+ collectorHosts = null;
+ }
+ if (collectorHosts != null) {
+ collectorHosts = collectorHosts.trim();
+ }
+ logger.info("AMS collector URL=" + collectorHosts);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
+ * getCollectorUri()
+ */
+ @Override
+ public String getCollectorUri() {
+
+ return collectorHosts;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see
+ * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
+ * getTimeoutSeconds()
+ */
+ @Override
+ protected int getTimeoutSeconds() {
+ // TODO: Hard coded timeout
+ return 10;
+ }
+
+ @Override
+ protected void emitMetrics(TimelineMetrics metrics) {
+ super.emitMetrics(metrics);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
new file mode 100644
index 0000000..7303694
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.URL;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.reflect.TypeToken;
+
+/**
+ * This class contains utility methods used by LogFeeder
+ */
+public class LogFeederUtil {
+ static Logger logger = Logger.getLogger(LogFeederUtil.class);
+
+ final static int HASH_SEED = 31174077;
+ public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
+
+ static Properties props;
+
+ private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+ private static int logInterval = 30000; // 30 seconds
+
+ public static Gson getGson() {
+ return gson;
+ }
+
+ /**
+ * This method will read the properties from System, followed by propFile
+ * and finally from the map
+ *
+ * @param propFile
+ * @param propNVList
+ * @throws Exception
+ */
+ static public void loadProperties(String propFile, String[] propNVList)
+ throws Exception {
+ logger.info("Loading properties. propFile=" + propFile);
+ props = new Properties(System.getProperties());
+ boolean propLoaded = false;
+
+ // First get properties file path from environment value
+ String propertiesFilePath = System.getProperty("properties");
+ if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+ File propertiesFile = new File(propertiesFilePath);
+ if (propertiesFile.exists() && propertiesFile.isFile()) {
+ logger.info("Properties file path set in environment. Loading properties file="
+ + propertiesFilePath);
+ FileInputStream fileInputStream = null;
+ try {
+ fileInputStream = new FileInputStream(propertiesFile);
+ props.load(fileInputStream);
+ propLoaded = true;
+ } catch (Throwable t) {
+ logger.error("Error loading properties file. properties file="
+ + propertiesFile.getAbsolutePath());
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (Throwable t) {
+ // Ignore error
+ }
+ }
+ }
+ } else {
+ logger.error("Properties file path set in environment, but file not found. properties file="
+ + propertiesFilePath);
+ }
+ }
+
+ if (!propLoaded) {
+ // Properties not yet loaded, let's try from class loader
+ BufferedInputStream fileInputStream = (BufferedInputStream) LogFeeder.class
+ .getClassLoader().getResourceAsStream(propFile);
+ if (fileInputStream != null) {
+ logger.info("Loading properties file " + propFile
+ + " from classpath");
+ props.load(fileInputStream);
+ propLoaded = true;
+ } else {
+ logger.fatal("Properties file not found in classpath. properties file name= "
+ + propFile);
+ }
+ }
+
+ if (!propLoaded) {
+ logger.fatal("Properties file is not loaded.");
+ throw new Exception("Properties not loaded");
+ } else {
+ // Let's load properties from argument list
+ updatePropertiesFromMap(propNVList);
+ }
+ }
+
+ /**
+ * @param nvList
+ */
+ private static void updatePropertiesFromMap(String[] nvList) {
+ if (nvList == null) {
+ return;
+ }
+ logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
+ + nvList.length);
+ if (nvList != null && nvList.length > 0) {
+ for (String nv : nvList) {
+ logger.info("Passed nv=" + nv);
+ if (nv.startsWith("-") && nv.length() > 1) {
+ nv = nv.substring(1);
+ logger.info("Stripped nv=" + nv);
+ int i = nv.indexOf("=");
+ if (nv.length() > i) {
+ logger.info("Candidate nv=" + nv);
+ String name = nv.substring(0, i);
+ String value = nv.substring(i + 1);
+ logger.info("Adding property from argument to properties. name="
+ + name + ", value=" + value);
+ props.put(name, value);
+ }
+ }
+ }
+ }
+ }
+
+ static public String getStringProperty(String key) {
+ if (props != null) {
+ return props.getProperty(key);
+ }
+ return null;
+ }
+
+ static public String getStringProperty(String key, String defaultValue) {
+ if (props != null) {
+ return props.getProperty(key, defaultValue);
+ }
+ return defaultValue;
+ }
+
+ static public boolean getBooleanProperty(String key, boolean defaultValue) {
+ String strValue = getStringProperty(key);
+ return toBoolean(strValue, defaultValue);
+ }
+
+ private static boolean toBoolean(String strValue, boolean defaultValue) {
+ boolean retValue = defaultValue;
+ if (!StringUtils.isEmpty(strValue)) {
+ if (strValue.equalsIgnoreCase("true")
+ || strValue.equalsIgnoreCase("yes")) {
+ retValue = true;
+ } else {
+ retValue = false;
+ }
+ }
+ return retValue;
+ }
+
+ static public int getIntProperty(String key, int defaultValue) {
+ String strValue = getStringProperty(key);
+ int retValue = defaultValue;
+ retValue = objectToInt(strValue, retValue, ", key=" + key);
+ return retValue;
+ }
+
+ public static int objectToInt(Object objValue, int retValue,
+ String errMessage) {
+ if (objValue == null) {
+ return retValue;
+ }
+ String strValue = objValue.toString();
+ if (!StringUtils.isEmpty(strValue)) {
+ try {
+ retValue = Integer.parseInt(strValue);
+ } catch (Throwable t) {
+ logger.error("Error parsing integer value. str=" + strValue
+ + ", " + errMessage);
+ }
+ }
+ return retValue;
+ }
+
+ static public boolean isEnabled(Map<String, Object> configs) {
+ return isEnabled(configs, configs);
+ }
+
+ static public boolean isEnabled(Map<String, Object> conditionConfigs,
+ Map<String, Object> valueConfigs) {
+ boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
+ .get("conditions");
+ if (conditions != null && conditions.size() > 0) {
+ allow = false;
+ for (String conditionType : conditions.keySet()) {
+ if (conditionType.equalsIgnoreCase("fields")) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> fields = (Map<String, Object>) conditions
+ .get("fields");
+ for (String fieldName : fields.keySet()) {
+ Object values = fields.get(fieldName);
+ if (values instanceof String) {
+ allow = isFieldConditionMatch(valueConfigs,
+ fieldName, (String) values);
+ } else {
+ @SuppressWarnings("unchecked")
+ List<String> listValues = (List<String>) values;
+ for (String stringValue : listValues) {
+ allow = isFieldConditionMatch(valueConfigs,
+ fieldName, stringValue);
+ if (allow) {
+ break;
+ }
+ }
+ }
+ if (allow) {
+ break;
+ }
+ }
+ }
+ if (allow) {
+ break;
+ }
+ }
+ }
+ return allow;
+ }
+
+ static public boolean isFieldConditionMatch(Map<String, Object> configs,
+ String fieldName, String stringValue) {
+ boolean allow = false;
+ String fieldValue = (String) configs.get(fieldName);
+ if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
+ allow = true;
+ } else {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> addFields = (Map<String, Object>) configs
+ .get("add_fields");
+ if (addFields != null && addFields.get(fieldName) != null) {
+ String addFieldValue = (String) addFields.get(fieldName);
+ if (stringValue.equalsIgnoreCase(addFieldValue)) {
+ allow = true;
+ }
+ }
+
+ }
+ return allow;
+ }
+
+ static public void logStatForMetric(MetricCount metric, String prefixStr,
+ String postFix) {
+ long currStat = metric.count;
+ long currMS = System.currentTimeMillis();
+ if (currStat > metric.prevLogCount) {
+ if (postFix == null) {
+ postFix = "";
+ }
+ logger.info(prefixStr + ": total_count=" + metric.count
+ + ", duration=" + (currMS - metric.prevLogMS) / 1000
+ + " secs, count=" + (currStat - metric.prevLogCount)
+ + postFix);
+ }
+ metric.prevLogCount = currStat;
+ metric.prevLogMS = currMS;
+ }
+
+ static public void logCountForMetric(MetricCount metric, String prefixStr,
+ String postFix) {
+ logger.info(prefixStr + ": count=" + metric.count + postFix);
+ }
+
+ public static Map<String, Object> cloneObject(Map<String, Object> map) {
+ if (map == null) {
+ return null;
+ }
+ String jsonStr = gson.toJson(map);
+ // We need to clone it, so we will create a JSON string and convert it
+ // back
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ return gson.fromJson(jsonStr, type);
+ }
+
+ public static Map<String, Object> toJSONObject(String jsonStr) {
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ return gson.fromJson(jsonStr, type);
+ }
+
+ static public boolean logErrorMessageByInterval(String key, String message,
+ Throwable e, Logger callerLogger, Level level) {
+
+ LogHistory log = logHistoryList.get(key);
+ if (log == null) {
+ log = new LogHistory();
+ logHistoryList.put(key, log);
+ }
+ if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+ log.lastLogTime = System.currentTimeMillis();
+ int counter = log.counter;
+ log.counter = 0;
+ if (counter > 0) {
+ message += ". Messages suppressed before: " + counter;
+ }
+ if (e == null) {
+ callerLogger.log(level, message);
+ } else {
+ callerLogger.log(level, message, e);
+ }
+
+ return true;
+ } else {
+ log.counter++;
+ }
+ return false;
+
+ }
+
+ static public String subString(String str, int maxLength) {
+ if (str == null || str.length() == 0) {
+ return "";
+ }
+ maxLength = str.length() < maxLength ? str.length() : maxLength;
+ return str.substring(0, maxLength);
+ }
+
+ static public long genHash(String value) {
+ if (value == null) {
+ value = "null";
+ }
+ return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
+ }
+
+ static class LogHistory {
+ long lastLogTime = 0;
+ int counter = 0;
+ }
+
+ public static String getDate(String timeStampStr) {
+ try {
+ DateFormat sdf = new SimpleDateFormat(DATE_FORMAT);
+ Date netDate = (new Date(Long.parseLong(timeStampStr)));
+ return sdf.format(netDate);
+ } catch (Exception ex) {
+ return null;
+ }
+ }
+
+ public static File getFileFromClasspath(String filename) {
+ URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
+ .getResource(filename);
+ logger.debug("File Complete URI :" + fileCompleteUrl);
+ File file = null;
+ try {
+ file = new File(fileCompleteUrl.toURI());
+ } catch (Exception exception) {
+ logger.debug(exception.getMessage(), exception.getCause());
+ }
+ return file;
+ }
+
+ public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
+ Object instance = null;
+ try {
+ instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+ } catch (Exception exception) {
+ logger.error("Unsupported class =" + classFullName, exception.getCause());
+ }
+ // check instance class as par aliasType
+ if (instance != null) {
+ boolean isValid = false;
+ switch (aliasType) {
+ case FILTER:
+ isValid = Filter.class.isAssignableFrom(instance.getClass());
+ break;
+ case INPUT:
+ isValid = Input.class.isAssignableFrom(instance.getClass());
+ break;
+ case OUTPUT:
+ isValid = Output.class.isAssignableFrom(instance.getClass());
+ break;
+ case MAPPER:
+ isValid = Mapper.class.isAssignableFrom(instance.getClass());
+ break;
+ default:
+ // by default consider all are valid class
+ isValid = true;
+ }
+ if (!isValid) {
+ logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+ }
+ }
+ return instance;
+ }
+
+ /**
+ * @param fileName
+ * @return
+ */
+ public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
+ });
+ return jsonmap;
+ } catch (JsonParseException e) {
+ logger.error(e, e.getCause());
+ } catch (JsonMappingException e) {
+ logger.error(e, e.getCause());
+ } catch (IOException e) {
+ logger.error(e, e.getCause());
+ }
+ return new HashMap<String, Object>();
+ }
+
+ public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+ if (list != null) {
+ for (String value : list) {
+ if (value != null) {
+ if (caseSensitive) {
+ if (value.equals(str)) {
+ return true;
+ }
+ } else {
+ if (value.equalsIgnoreCase(str)) {
+ return true;
+ }
+ }
+ if (value.equalsIgnoreCase("ALL")) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
new file mode 100644
index 0000000..c715881
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+public class MetricCount {
+ public String metricsName = null;
+ public boolean isPointInTime = false;
+
+ public long count = 0;
+ public long prevLogCount = 0;
+ public long prevLogMS = System.currentTimeMillis();
+ public long prevPublishCount = 0;
+ public long prevPublishMS = 0; // We will try to publish one immediately
+ public int publishCount = 0; // Count of published metrics. Used for first
+ // time sending metrics
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
new file mode 100644
index 0000000..2152d14
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsMgr {
+ static Logger logger = Logger.getLogger(MetricsMgr.class);
+
+ boolean isMetricsEnabled = false;
+ String nodeHostName = null;
+ String appId = "logfeeder";
+
+ long lastPublishTimeMS = 0; // Let's do the first publish immediately
+ long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the
+ // clock
+
+ int publishIntervalMS = 60 * 1000;
+ int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
+ // the metrics in memory forever
+ HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+ LogFeederAMSClient amsClient = null;
+
+ public void init() {
+ logger.info("Initializing MetricsMgr()");
+ amsClient = new LogFeederAMSClient();
+
+ if (amsClient.getCollectorUri() != null) {
+ nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost().getHostName();
+ } catch (Throwable e) {
+ logger.warn(
+ "Error getting hostname using InetAddress.getLocalHost().getHostName()",
+ e);
+ }
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost()
+ .getCanonicalHostName();
+ } catch (Throwable e) {
+ logger.warn(
+ "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
+ e);
+ }
+ }
+ }
+ if (nodeHostName == null) {
+ isMetricsEnabled = false;
+ logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+ } else {
+ isMetricsEnabled = true;
+ logger.info("LogFeeder Metrics is enabled. Metrics host="
+ + amsClient.getCollectorUri());
+ }
+ } else {
+ logger.info("LogFeeder Metrics publish is disabled");
+ }
+ }
+
+ /**
+ * @return
+ */
+ public boolean isMetricsEnabled() {
+ return isMetricsEnabled;
+ }
+
+ /**
+ * @param metricsList
+ */
+ synchronized public void useMetrics(List<MetricCount> metricsList) {
+ if (!isMetricsEnabled) {
+ return;
+ }
+ logger.info("useMetrics() metrics.size=" + metricsList.size());
+ long currMS = System.currentTimeMillis();
+ Long currMSLong = new Long(currMS);
+ for (MetricCount metric : metricsList) {
+ if (metric.metricsName == null) {
+ logger.debug("metric.metricsName is null");
+ // Metrics is not meant to be published
+ continue;
+ }
+ long currCount = metric.count;
+ if (!metric.isPointInTime && metric.publishCount > 0
+ && currCount <= metric.prevPublishCount) {
+ // No new data added, so let's ignore it
+ logger.debug("Nothing changed. " + metric.metricsName
+ + ", currCount=" + currCount + ", prevPublishCount="
+ + metric.prevPublishCount);
+ continue;
+ }
+ metric.publishCount++;
+
+ TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+ if (timelineMetric == null) {
+ logger.debug("Creating new metric obbject for "
+ + metric.metricsName);
+ // First time for this metric
+ timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metric.metricsName);
+ timelineMetric.setHostName(nodeHostName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setStartTime(currMS);
+ timelineMetric.setType("Long");
+ timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+ metricsMap.put(metric.metricsName, timelineMetric);
+ }
+ logger.debug("Adding metrics=" + metric.metricsName);
+ if (metric.isPointInTime) {
+ timelineMetric.getMetricValues().put(currMSLong,
+ new Double(currCount));
+ } else {
+ Double value = timelineMetric.getMetricValues().get(currMSLong);
+ if (value == null) {
+ value = new Double(0);
+ }
+ value += (currCount - metric.prevPublishCount);
+ timelineMetric.getMetricValues().put(currMSLong, value);
+ metric.prevPublishCount = currCount;
+ metric.prevPublishMS = currMS;
+ }
+ }
+
+ if (metricsMap.size() > 0
+ && currMS - lastPublishTimeMS > publishIntervalMS) {
+ try {
+ // Time to publish
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
+ timeLineMetricList.addAll(metricsMap.values());
+ timelineMetrics.setMetrics(timeLineMetricList);
+ amsClient.emitMetrics(timelineMetrics);
+ logger.info("Published " + timeLineMetricList.size()
+ + " metrics to AMS");
+ metricsMap.clear();
+ timeLineMetricList.clear();
+ lastPublishTimeMS = currMS;
+ } catch (Throwable t) {
+ logger.warn("Error sending metrics to AMS.", t);
+ if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+ logger.error("AMS was not sent for last "
+ + maxMetricsBuffer
+ / 1000
+ + " seconds. Purging it and will start rebuilding it again");
+ metricsMap.clear();
+ lastFailedPublishTimeMS = currMS;
+ }
+ }
+ } else {
+ logger.info("Not publishing metrics. metrics.size()="
+ + metricsMap.size() + ", lastPublished="
+ + (currMS - lastPublishTimeMS) / 1000
+ + " seconds ago, intervalConfigured=" + publishIntervalMS
+ / 1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
new file mode 100644
index 0000000..2a54f28
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MurmurHash.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import com.google.common.primitives.Ints;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * This is a very fast, non-cryptographic hash suitable for general hash-based
+ * lookup. See http://murmurhash.googlepages.com/ for more details.
+ * <p/>
+ * <p>The C version of MurmurHash 2.0 found at that site was ported
+ * to Java by Andrzej Bialecki (ab at getopt org).</p>
+ */
+public final class MurmurHash {
+
+ private MurmurHash() {
+ }
+
+ /**
+ * Hashes an int.
+ *
+ * @param data The int to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit hash of the bytes in question.
+ */
+ public static int hash(int data, int seed) {
+ return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed);
+ }
+
+ /**
+ * Hashes bytes in an array.
+ *
+ * @param data The bytes to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit hash of the bytes in question.
+ */
+ public static int hash(byte[] data, int seed) {
+ return hash(ByteBuffer.wrap(data), seed);
+ }
+
+ /**
+ * Hashes bytes in part of an array.
+ *
+ * @param data The data to hash.
+ * @param offset Where to start munging.
+ * @param length How many bytes to process.
+ * @param seed The seed to start with.
+ * @return The 32-bit hash of the data in question.
+ */
+ public static int hash(byte[] data, int offset, int length, int seed) {
+ return hash(ByteBuffer.wrap(data, offset, length), seed);
+ }
+
+ /**
+ * Hashes the bytes in a buffer from the current position to the limit.
+ *
+ * @param buf The bytes to hash.
+ * @param seed The seed for the hash.
+ * @return The 32 bit murmur hash of the bytes in the buffer.
+ */
+ public static int hash(ByteBuffer buf, int seed) {
+ // save byte order for later restoration
+ ByteOrder byteOrder = buf.order();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+
+ int m = 0x5bd1e995;
+ int r = 24;
+
+ int h = seed ^ buf.remaining();
+
+ while (buf.remaining() >= 4) {
+ int k = buf.getInt();
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h *= m;
+ h ^= k;
+ }
+
+ if (buf.remaining() > 0) {
+ ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ // for big-endian version, use this first:
+ // finish.position(4-buf.remaining());
+ finish.put(buf).rewind();
+ h ^= finish.getInt();
+ h *= m;
+ }
+
+ h ^= h >>> 13;
+ h *= m;
+ h ^= h >>> 15;
+
+ buf.order(byteOrder);
+ return h;
+ }
+
+
+ public static long hash64A(byte[] data, int seed) {
+ return hash64A(ByteBuffer.wrap(data), seed);
+ }
+
+ public static long hash64A(byte[] data, int offset, int length, int seed) {
+ return hash64A(ByteBuffer.wrap(data, offset, length), seed);
+ }
+
+ public static long hash64A(ByteBuffer buf, int seed) {
+ ByteOrder byteOrder = buf.order();
+ buf.order(ByteOrder.LITTLE_ENDIAN);
+
+ long m = 0xc6a4a7935bd1e995L;
+ int r = 47;
+
+ long h = seed ^ (buf.remaining() * m);
+
+ while (buf.remaining() >= 8) {
+ long k = buf.getLong();
+
+ k *= m;
+ k ^= k >>> r;
+ k *= m;
+
+ h ^= k;
+ h *= m;
+ }
+
+ if (buf.remaining() > 0) {
+ ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+ // for big-endian version, do this first:
+ // finish.position(8-buf.remaining());
+ finish.put(buf).rewind();
+ h ^= finish.getLong();
+ h *= m;
+ }
+
+ h ^= h >>> r;
+ h *= m;
+ h ^= h >>> r;
+
+ buf.order(byteOrder);
+ return h;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
new file mode 100644
index 0000000..5b70bca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputMgr {
+ static Logger logger = Logger.getLogger(OutputMgr.class);
+
+ Collection<Output> outputList = new ArrayList<Output>();
+
+ String hostName = null;
+ String ipAddress = null;
+ boolean addMessageMD5 = true;
+
+ private int MAX_OUTPUT_SIZE = 32765; // 32766-1
+ static long doc_counter = 0;
+ public MetricCount messageTruncateMetric = new MetricCount();
+
+ public OutputMgr() {
+ // Set the host for this server
+ try {
+ InetAddress ip = InetAddress.getLocalHost();
+ ipAddress = ip.getHostAddress();
+ hostName = ip.getHostName();
+ } catch (UnknownHostException e) {
+ logger.error("Error getting hostname.", e);
+ }
+ }
+
+ public Collection<Output> getOutputList() {
+ return outputList;
+ }
+
+ public void setOutputList(Collection<Output> outputList) {
+ this.outputList = outputList;
+ }
+
+ /**
+ * @param jsonObj
+ * @param inputStr
+ * @param input
+ */
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+
+ // Update the block with the context fields
+ for (Map.Entry<String, String> entry : input.getContextFields()
+ .entrySet()) {
+ if (jsonObj.get(entry.getKey()) == null) {
+ jsonObj.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // TODO: Ideally most of the overrides should be configurable
+
+ // Add the input type
+ if (jsonObj.get("type") == null) {
+ jsonObj.put("type", input.getStringValue("type"));
+ }
+ if (jsonObj.get("path") == null && input.getFilePath() != null) {
+ jsonObj.put("path", input.getFilePath());
+ }
+ if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+ jsonObj.put("path", input.getStringValue("path"));
+ }
+
+ // Add host if required
+ if (jsonObj.get("host") == null && hostName != null) {
+ jsonObj.put("host", hostName);
+ }
+ // Add IP if required
+ if (jsonObj.get("ip") == null && ipAddress != null) {
+ jsonObj.put("ip", ipAddress);
+ }
+
+ if (input.isUseEventMD5() || input.isGenEventMD5()) {
+ String prefix = "";
+ Object logtimeObj = jsonObj.get("logtime");
+ if (logtimeObj != null) {
+ if (logtimeObj instanceof Date) {
+ prefix = "" + ((Date) logtimeObj).getTime();
+ } else {
+ prefix = logtimeObj.toString();
+ }
+ }
+ Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
+ .toJson(jsonObj));
+ if (input.isGenEventMD5()) {
+ jsonObj.put("event_md5", prefix + eventMD5.toString());
+ }
+ if (input.isUseEventMD5()) {
+ jsonObj.put("id", prefix + eventMD5.toString());
+ }
+ }
+
+ // jsonObj.put("@timestamp", new Date());
+ jsonObj.put("seq_num", new Long(doc_counter++));
+ if (jsonObj.get("id") == null) {
+ jsonObj.put("id", UUID.randomUUID().toString());
+ }
+ if (jsonObj.get("event_count") == null) {
+ jsonObj.put("event_count", new Integer(1));
+ }
+ if (inputMarker.lineNumber > 0) {
+ jsonObj.put("logfile_line_number", new Integer(
+ inputMarker.lineNumber));
+ }
+ if (jsonObj.containsKey("log_message")) {
+ // TODO: Let's check size only for log_message for now
+ String logMessage = (String) jsonObj.get("log_message");
+ if (logMessage != null
+ && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+ messageTruncateMetric.count++;
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_MESSAGESIZE";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Message is too big. size="
+ + logMessage.getBytes().length + ", input="
+ + input.getShortDescription()
+ + ". Truncating to " + MAX_OUTPUT_SIZE
+ + ", first upto 100 characters="
+ + LogFeederUtil.subString(logMessage, 100),
+ null, logger, Level.WARN);
+ logMessage = new String(logMessage.getBytes(), 0,
+ MAX_OUTPUT_SIZE);
+ jsonObj.put("log_message", logMessage);
+ // Add error tags
+ @SuppressWarnings("unchecked")
+ List<String> tagsList = (List<String>) jsonObj.get("tags");
+ if (tagsList == null) {
+ tagsList = new ArrayList<String>();
+ jsonObj.put("tags", tagsList);
+ }
+ tagsList.add("error_message_truncated");
+
+ }
+ if (addMessageMD5) {
+ jsonObj.put("message_md5",
+ "" + LogFeederUtil.genHash(logMessage));
+ }
+ }
+ //check log is allowed to send output
+ if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+ for (Output output : input.getOutputList()) {
+ try {
+ output.write(jsonObj, inputMarker);
+ } catch (Exception e) {
+ logger.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ public void write(String jsonBlock, InputMarker inputMarker) {
+ //check log is allowed to send output
+ if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+ for (Output output : inputMarker.input.getOutputList()) {
+ try {
+ output.write(jsonBlock, inputMarker);
+ } catch (Exception e) {
+ logger.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Close all the outputs
+ */
+ public void close() {
+ logger.info("Close called for outputs ...");
+ for (Output output : outputList) {
+ try {
+ output.setDrain(true);
+ output.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ int i;
+ boolean allClosed = true;
+ for (i = 0; i < iterations; i++) {
+ allClosed = true;
+ for (Output output : outputList) {
+ if (!output.isClosed()) {
+ try {
+ allClosed = false;
+ logger.warn("Waiting for output to close. "
+ + output.getShortDescription() + ", "
+ + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ break;
+ }
+ }
+
+ if (!allClosed) {
+ logger.warn("Some outpus were not closed. Iterations=" + i);
+ for (Output output : outputList) {
+ if (!output.isClosed()) {
+ logger.warn("Output not closed. Will ignore it."
+ + output.getShortDescription() + ", pendingCound="
+ + output.getPendingCount());
+ }
+ }
+ } else {
+ logger.info("All outputs are closed. Iterations=" + i);
+ }
+ }
+
+ /**
+ *
+ */
+ public void logStats() {
+ for (Output output : outputList) {
+ output.logStat();
+ }
+ LogFeederUtil.logStatForMetric(messageTruncateMetric,
+ "Stat: Messages Truncated", null);
+ }
+
+ /**
+ * @param metricsList
+ */
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ metricsList.add(messageTruncateMetric);
+ for (Output output : outputList) {
+ output.addMetricsContainers(metricsList);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/39c85bb8/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
new file mode 100644
index 0000000..aa1edea
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.AliasUtil;
+import org.apache.ambari.logfeeder.ConfigBlock;
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Priority;
+
+public abstract class Filter extends ConfigBlock {
+ static private Logger logger = Logger.getLogger(Filter.class);
+
+ OutputMgr outputMgr;
+ Input input;
+ Filter nextFilter = null;
+
+ Map<String, List<Mapper>> postFieldValueMappers = new HashMap<String, List<Mapper>>();
+
+ @Override
+ public void init() throws Exception {
+ super.init();
+
+ initializePostMapValues();
+ if (nextFilter != null) {
+ nextFilter.init();
+ }
+ }
+
+ /**
+ *
+ */
+ @SuppressWarnings("unchecked")
+ protected void initializePostMapValues() {
+ // Initialize map values
+ Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
+ if (postMapValues == null) {
+ return;
+ }
+ for (String fieldName : postMapValues.keySet()) {
+ List<Map<String, Object>> mapList = null;
+ Object values = postMapValues.get(fieldName);
+ if (values instanceof List<?>) {
+ mapList = (List<Map<String, Object>>) values;
+ } else {
+ mapList = new ArrayList<Map<String, Object>>();
+ mapList.add((Map<String, Object>) values);
+ }
+ for (Map<String, Object> mapObject : mapList) {
+ for (String mapClassCode : mapObject.keySet()) {
+ Mapper mapper = getMapper(mapClassCode);
+ if (mapper == null) {
+ break;
+ }
+ if (mapper.init(getInput().getShortDescription(),
+ fieldName, mapClassCode,
+ mapObject.get(mapClassCode))) {
+ List<Mapper> fieldMapList = postFieldValueMappers
+ .get(fieldName);
+ if (fieldMapList == null) {
+ fieldMapList = new ArrayList<Mapper>();
+ postFieldValueMappers.put(fieldName, fieldMapList);
+ }
+ fieldMapList.add(mapper);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param mapClassCode
+ * @return
+ */
+ protected Mapper getMapper(String mapClassCode) {
+ String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS);
+ if (classFullName != null && !classFullName.isEmpty()) {
+ Mapper mapper = (Mapper) LogFeederUtil.getClassInstance(classFullName, ALIAS_TYPE.MAPPER);
+ return mapper;
+ }
+ return null;
+ }
+
+ public void setOutputMgr(OutputMgr outputMgr) {
+ this.outputMgr = outputMgr;
+ }
+
+ public Filter getNextFilter() {
+ return nextFilter;
+ }
+
+ public void setNextFilter(Filter nextFilter) {
+ this.nextFilter = nextFilter;
+ }
+
+ public Input getInput() {
+ return input;
+ }
+
+ public void setInput(Input input) {
+ this.input = input;
+ }
+
+ /**
+ * Deriving classes should implement this at the minimum
+ *
+ * @param inputStr
+ * @param marker
+ */
+ public void apply(String inputStr, InputMarker inputMarker) {
+ // TODO: There is no transformation for string types.
+ if (nextFilter != null) {
+ nextFilter.apply(inputStr, inputMarker);
+ } else {
+ outputMgr.write(inputStr, inputMarker);
+ }
+ }
+
+ public void apply(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ if (postFieldValueMappers.size() > 0) {
+ for (String fieldName : postFieldValueMappers.keySet()) {
+ Object value = jsonObj.get(fieldName);
+ if (value != null) {
+ for (Mapper mapper : postFieldValueMappers.get(fieldName)) {
+ value = mapper.apply(jsonObj, value);
+ }
+ }
+ }
+ }
+ if (nextFilter != null) {
+ nextFilter.apply(jsonObj, inputMarker);
+ } else {
+ outputMgr.write(jsonObj, inputMarker);
+ }
+ }
+
+ /**
+ *
+ */
+ public void close() {
+ if (nextFilter != null) {
+ nextFilter.close();
+ }
+ }
+
+ public void flush() {
+
+ }
+
+ @Override
+ public void logStat() {
+ super.logStat();
+ if (nextFilter != null) {
+ nextFilter.logStat();
+ }
+ }
+
+ @Override
+ public boolean isFieldConditionMatch(String fieldName, String stringValue) {
+ if (!super.isFieldConditionMatch(fieldName, stringValue)) {
+ // Let's try input
+ if (input != null) {
+ return input.isFieldConditionMatch(fieldName, stringValue);
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public String getShortDescription() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public boolean logConfgs(Priority level) {
+ if (!super.logConfgs(level)) {
+ return false;
+ }
+ logger.log(level, "input=" + input.getShortDescription());
+ return true;
+ }
+
+ @Override
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ super.addMetricsContainers(metricsList);
+ if (nextFilter != null) {
+ nextFilter.addMetricsContainers(metricsList);
+ }
+ }
+
+}