You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:29 UTC
[02/52] [abbrv] ambari git commit: AMBARI-18236. Fix package
structure in Logfeeder (Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
deleted file mode 100644
index c22b512..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/exception/LogfeederException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.exception;
-
-public class LogfeederException extends Exception {
-
- public LogfeederException(String message, Throwable throwable) {
- super(message, throwable);
- }
-
- public LogfeederException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index 01d4f79..ab371f1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -24,17 +24,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.AliasUtil;
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_PARAM;
-import org.apache.ambari.logfeeder.AliasUtil.ALIAS_TYPE;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.util.AliasUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_PARAM;
+import org.apache.ambari.logfeeder.util.AliasUtil.ALIAS_TYPE;
import org.apache.log4j.Logger;
import org.apache.log4j.Priority;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 7aa649d..372c208 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -34,10 +34,10 @@ import java.util.regex.Pattern;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.exception.GrokException;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
index f375374..2954106 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterJSON.java
@@ -20,9 +20,9 @@ package org.apache.ambari.logfeeder.filter;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public class FilterJSON extends Filter {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index 1b8b3a3..7adb468 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -23,10 +23,10 @@ import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 76af16c..5feb9c4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -26,13 +26,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.InputMgr;
-import org.apache.ambari.logfeeder.MetricCount;
-import org.apache.ambari.logfeeder.OutputMgr;
-import org.apache.ambari.logfeeder.exception.LogfeederException;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.output.OutputMgr;
import org.apache.log4j.Logger;
public abstract class Input extends ConfigBlock implements Runnable {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 9d3545e..c9f5ded 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -33,8 +33,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.reader.LogsearchReaderFactory;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
new file mode 100644
index 0000000..b18c9b0
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMgr.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.io.filefilter.WildcardFileFilter;
+import org.apache.log4j.Logger;
+import org.apache.solr.common.util.Base64;
+
+public class InputMgr {
+ private static final Logger logger = Logger.getLogger(InputMgr.class);
+
+ private List<Input> inputList = new ArrayList<Input>();
+ private Set<Input> notReadyList = new HashSet<Input>();
+
+ private boolean isDrain = false;
+ private boolean isAnyInputTail = false;
+
+ private String checkPointSubFolderName = "logfeeder_checkpoints";
+ private File checkPointFolderFile = null;
+
+ private MetricCount filesCountMetric = new MetricCount();
+
+ private String checkPointExtension = ".cp";
+
+ private Thread inputIsReadyMonitor = null;
+
+ public List<Input> getInputList() {
+ return inputList;
+ }
+
+ public void add(Input input) {
+ inputList.add(input);
+ }
+
+ public void removeInput(Input input) {
+ logger.info("Trying to remove from inputList. "
+ + input.getShortDescription());
+ Iterator<Input> iter = inputList.iterator();
+ while (iter.hasNext()) {
+ Input iterInput = iter.next();
+ if (iterInput.equals(input)) {
+ logger.info("Removing Input from inputList. "
+ + input.getShortDescription());
+ iter.remove();
+ }
+ }
+ }
+
+ public int getActiveFilesCount() {
+ int count = 0;
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public void init() {
+ filesCountMetric.metricsName = "input.files.count";
+ filesCountMetric.isPointInTime = true;
+
+ checkPointExtension = LogFeederUtil.getStringProperty(
+ "logfeeder.checkpoint.extension", checkPointExtension);
+ for (Input input : inputList) {
+ try {
+ input.init();
+ if (input.isTail()) {
+ isAnyInputTail = true;
+ }
+ } catch (Exception e) {
+ logger.error(
+ "Error initializing input. "
+ + input.getShortDescription(), e);
+ }
+ }
+
+ if (isAnyInputTail) {
+ logger.info("Determining valid checkpoint folder");
+ boolean isCheckPointFolderValid = false;
+ // We need to keep track of the files we are reading.
+ String checkPointFolder = LogFeederUtil
+ .getStringProperty("logfeeder.checkpoint.folder");
+ if (checkPointFolder != null && !checkPointFolder.isEmpty()) {
+ checkPointFolderFile = new File(checkPointFolder);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's try home folder
+ String userHome = LogFeederUtil.getStringProperty("user.home");
+ if (userHome != null) {
+ checkPointFolderFile = new File(userHome,
+ checkPointSubFolderName);
+ logger.info("Checking if home folder can be used for checkpoints. Folder="
+ + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ }
+ }
+ if (!isCheckPointFolderValid) {
+ // Let's use tmp folder
+ String tmpFolder = LogFeederUtil
+ .getStringProperty("java.io.tmpdir");
+ if (tmpFolder == null) {
+ tmpFolder = "/tmp";
+ }
+ checkPointFolderFile = new File(tmpFolder,
+ checkPointSubFolderName);
+ logger.info("Checking if tmps folder can be used for checkpoints. Folder="
+ + checkPointFolderFile);
+ isCheckPointFolderValid = verifyCheckPointFolder(checkPointFolderFile);
+ if (isCheckPointFolderValid) {
+ logger.warn("Using tmp folder "
+ + checkPointFolderFile
+ + " to store check points. This is not recommended."
+ + "Please set logfeeder.checkpoint.folder property");
+ }
+ }
+
+ if (isCheckPointFolderValid) {
+ logger.info("Using folder " + checkPointFolderFile
+ + " for storing checkpoints");
+ }
+ }
+
+ }
+
+ public File getCheckPointFolderFile() {
+ return checkPointFolderFile;
+ }
+
+ private boolean verifyCheckPointFolder(File folderPathFile) {
+ if (!folderPathFile.exists()) {
+ // Create the folder
+ try {
+ if (!folderPathFile.mkdir()) {
+ logger.warn("Error creating folder for check point. folder="
+ + folderPathFile);
+ }
+ } catch (Throwable t) {
+ logger.warn("Error creating folder for check point. folder="
+ + folderPathFile, t);
+ }
+ }
+
+ if (folderPathFile.exists() && folderPathFile.isDirectory()) {
+ // Let's check whether we can create a file
+ File testFile = new File(folderPathFile, UUID.randomUUID()
+ .toString());
+ try {
+ testFile.createNewFile();
+ return testFile.delete();
+ } catch (IOException e) {
+ logger.warn(
+ "Couldn't create test file in "
+ + folderPathFile.getAbsolutePath()
+ + " for checkPoint", e);
+ }
+ }
+ return false;
+ }
+
+ public void monitor() {
+ for (Input input : inputList) {
+ if (input.isReady()) {
+ input.monitor();
+ } else {
+ if (input.isTail()) {
+ logger.info("Adding input to not ready list. Note, it is possible this component is not run on this host. So it might not be an issue. "
+ + input.getShortDescription());
+ notReadyList.add(input);
+ } else {
+ logger.info("Input is not ready, so going to ignore it "
+ + input.getShortDescription());
+ }
+ }
+ }
+ // Start the monitoring thread if any file is in tail mode
+ if (isAnyInputTail) {
+ inputIsReadyMonitor = new Thread("InputIsReadyMonitor") {
+ @Override
+ public void run() {
+ logger.info("Going to monitor for these missing files: "
+ + notReadyList.toString());
+ while (true) {
+ if (isDrain) {
+ logger.info("Exiting missing file monitor.");
+ break;
+ }
+ try {
+ Iterator<Input> iter = notReadyList.iterator();
+ while (iter.hasNext()) {
+ Input input = iter.next();
+ try {
+ if (input.isReady()) {
+ input.monitor();
+ iter.remove();
+ }
+ } catch (Throwable t) {
+ logger.error("Error while enabling monitoring for input. "
+ + input.getShortDescription());
+ }
+ }
+ Thread.sleep(30 * 1000);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ };
+ inputIsReadyMonitor.start();
+ }
+ }
+
+ public void addToNotReady(Input notReadyInput) {
+ notReadyList.add(notReadyInput);
+ }
+
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ for (Input input : inputList) {
+ input.addMetricsContainers(metricsList);
+ }
+ filesCountMetric.count = getActiveFilesCount();
+ metricsList.add(filesCountMetric);
+ }
+
+ public void logStats() {
+ for (Input input : inputList) {
+ input.logStat();
+ }
+
+ filesCountMetric.count = getActiveFilesCount();
+ LogFeederUtil.logStatForMetric(filesCountMetric,
+ "Stat: Files Monitored Count", null);
+ }
+
+ public void close() {
+ for (Input input : inputList) {
+ try {
+ input.setDrain(true);
+ } catch (Throwable t) {
+ logger.error(
+ "Error while draining. input="
+ + input.getShortDescription(), t);
+ }
+ }
+ isDrain = true;
+
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ int i = 0;
+ boolean allClosed = true;
+ for (i = 0; i < iterations; i++) {
+ allClosed = true;
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ try {
+ allClosed = false;
+ logger.warn("Waiting for input to close. "
+ + input.getShortDescription() + ", "
+ + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ break;
+ }
+ }
+ if (!allClosed) {
+ logger.warn("Some inputs were not closed. Iterations=" + i);
+ for (Input input : inputList) {
+ if (!input.isClosed()) {
+ logger.warn("Input not closed. Will ignore it."
+ + input.getShortDescription());
+ }
+ }
+ } else {
+ logger.info("All inputs are closed. Iterations=" + i);
+ }
+
+ }
+
+ public void checkInAll() {
+ for (Input input : inputList) {
+ input.checkIn();
+ }
+ }
+
+ public void cleanCheckPointFiles() {
+
+ if (checkPointFolderFile == null) {
+ logger.info("Will not clean checkPoint files. checkPointFolderFile="
+ + checkPointFolderFile);
+ return;
+ }
+ logger.info("Cleaning checkPoint files. checkPointFolderFile="
+ + checkPointFolderFile.getAbsolutePath());
+ try {
+ // Loop over the check point files and if filePath is not present, then move to closed
+ String searchPath = "*" + checkPointExtension;
+ FileFilter fileFilter = new WildcardFileFilter(searchPath);
+ File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
+ int totalCheckFilesDeleted = 0;
+ for (File checkPointFile : checkPointFiles) {
+ RandomAccessFile checkPointReader = null;
+ try {
+ checkPointReader = new RandomAccessFile(checkPointFile, "r");
+
+ int contentSize = checkPointReader.readInt();
+ byte b[] = new byte[contentSize];
+ int readSize = checkPointReader.read(b, 0, contentSize);
+ if (readSize != contentSize) {
+ logger.error("Couldn't read expected number of bytes from checkpoint file. expected="
+ + contentSize
+ + ", read="
+ + readSize
+ + ", checkPointFile=" + checkPointFile);
+ } else {
+ // Create JSON string
+ String jsonCheckPointStr = new String(b, 0, readSize);
+ Map<String, Object> jsonCheckPoint = LogFeederUtil
+ .toJSONObject(jsonCheckPointStr);
+
+ String logFilePath = (String) jsonCheckPoint
+ .get("file_path");
+ String logFileKey = (String) jsonCheckPoint
+ .get("file_key");
+ if (logFilePath != null && logFileKey != null) {
+ boolean deleteCheckPointFile = false;
+ File logFile = new File(logFilePath);
+ if (logFile.exists()) {
+ Object fileKeyObj = InputFile
+ .getFileKey(logFile);
+ String fileBase64 = Base64
+ .byteArrayToBase64(fileKeyObj
+ .toString().getBytes());
+ if (!logFileKey.equals(fileBase64)) {
+ deleteCheckPointFile = true;
+ logger.info("CheckPoint clean: File key has changed. old="
+ + logFileKey
+ + ", new="
+ + fileBase64
+ + ", filePath="
+ + logFilePath
+ + ", checkPointFile="
+ + checkPointFile.getAbsolutePath());
+ }
+ } else {
+ logger.info("CheckPoint clean: Log file doesn't exist. filePath="
+ + logFilePath
+ + ", checkPointFile="
+ + checkPointFile.getAbsolutePath());
+ deleteCheckPointFile = true;
+ }
+ if (deleteCheckPointFile) {
+ logger.info("Deleting CheckPoint file="
+ + checkPointFile.getAbsolutePath()
+ + ", logFile=" + logFilePath);
+ checkPointFile.delete();
+ totalCheckFilesDeleted++;
+ }
+ }
+ }
+ } catch (EOFException eof) {
+ logger.warn("Caught EOFException. Ignoring reading existing checkPoint file. "
+ + checkPointFile);
+ } catch (Throwable t) {
+ logger.error("Error while checking checkPoint file. "
+ + checkPointFile, t);
+ } finally {
+ if (checkPointReader != null) {
+ try {
+ checkPointReader.close();
+ } catch (Throwable t) {
+ logger.error("Error closing checkPoint file. "
+ + checkPointFile, t);
+ }
+ }
+ }
+ }
+ logger.info("Deleted " + totalCheckFilesDeleted
+ + " checkPoint file(s). checkPointFolderFile="
+ + checkPointFolderFile.getAbsolutePath());
+
+ } catch (Throwable t) {
+ logger.error("Error while cleaning checkPointFiles", t);
+ }
+ }
+
+ public void waitOnAllInputs() {
+ //wait on inputs
+ if (inputList != null) {
+ for (Input input : inputList) {
+ if (input != null) {
+ Thread inputThread = input.getThread();
+ if (inputThread != null) {
+ try {
+ inputThread.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+ // wait on monitor
+ if (inputIsReadyMonitor != null) {
+ try {
+ this.close();
+ inputIsReadyMonitor.join();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index 12a512f..c9d28bd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -28,8 +28,8 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
index 48ad7ac..5ba56a5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputSimulate.java
@@ -27,9 +27,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.filter.FilterJSON;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.solr.common.util.Base64;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index 872460b..ae0cfc0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -26,7 +26,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.TimeZone;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.SolrUtil;
import org.apache.ambari.logfeeder.view.VLogfeederFilter;
import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
index 128c5c4..bc807193 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
@@ -22,7 +22,7 @@ package org.apache.ambari.logfeeder.logconfig;
import java.util.ArrayList;
import java.util.List;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public enum LogfeederScheduler {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
index 8691a19..b5e4eb3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
@@ -22,9 +22,9 @@ package org.apache.ambari.logfeeder.logconfig.filter;
import java.util.List;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.view.VLogfeederFilter;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
index bf33f93..3a8eae9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
@@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.logconfig.filter;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
/**
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 45ccc70..9aa0b23 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -23,7 +23,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index e1f8f97..c692a9d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 7e530f5..e618261 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -21,7 +21,7 @@ package org.apache.ambari.logfeeder.mapper;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
new file mode 100644
index 0000000..c99a091
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
+ private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+
+ private String collectorHosts = null;
+
+ public LogFeederAMSClient() {
+ collectorHosts = LogFeederUtil
+ .getStringProperty("logfeeder.metrics.collector.hosts");
+ if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+ collectorHosts = null;
+ }
+ if (collectorHosts != null) {
+ collectorHosts = collectorHosts.trim();
+ }
+ logger.info("AMS collector URL=" + collectorHosts);
+ }
+
+ @Override
+ public String getCollectorUri() {
+ return collectorHosts;
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ // TODO: Hard coded timeout
+ return 10;
+ }
+
+ @Override
+ protected boolean emitMetrics(TimelineMetrics metrics) {
+ return super.emitMetrics(metrics);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
new file mode 100644
index 0000000..abb84c7
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+public class MetricCount {
+ public String metricsName = null;
+ public boolean isPointInTime = false;
+
+ public long count = 0;
+ public long prevLogCount = 0;
+ public long prevLogMS = System.currentTimeMillis();
+ public long prevPublishCount = 0;
+ public int publishCount = 0; // Count of published metrics. Used for first time sending metrics
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
new file mode 100644
index 0000000..eff9d0d
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsMgr {
+ private static final Logger logger = Logger.getLogger(MetricsMgr.class);
+
+ private boolean isMetricsEnabled = false;
+ private String nodeHostName = null;
+ private String appId = "logfeeder";
+
+ private long lastPublishTimeMS = 0; // Let's do the first publish immediately
+ private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
+
+ private int publishIntervalMS = 60 * 1000;
+ private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
+ // the metrics in memory forever
+ private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+ private LogFeederAMSClient amsClient = null;
+
+ public void init() {
+ logger.info("Initializing MetricsMgr()");
+ amsClient = new LogFeederAMSClient();
+
+ if (amsClient.getCollectorUri() != null) {
+ nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost().getHostName();
+ } catch (Throwable e) {
+ logger.warn(
+ "Error getting hostname using InetAddress.getLocalHost().getHostName()",
+ e);
+ }
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost()
+ .getCanonicalHostName();
+ } catch (Throwable e) {
+ logger.warn(
+ "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
+ e);
+ }
+ }
+ }
+ if (nodeHostName == null) {
+ isMetricsEnabled = false;
+ logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+ } else {
+ isMetricsEnabled = true;
+ logger.info("LogFeeder Metrics is enabled. Metrics host="
+ + amsClient.getCollectorUri());
+ }
+ } else {
+ logger.info("LogFeeder Metrics publish is disabled");
+ }
+ }
+
+ public boolean isMetricsEnabled() {
+ return isMetricsEnabled;
+ }
+
+ synchronized public void useMetrics(List<MetricCount> metricsList) {
+ if (!isMetricsEnabled) {
+ return;
+ }
+ logger.info("useMetrics() metrics.size=" + metricsList.size());
+ long currMS = System.currentTimeMillis();
+ Long currMSLong = new Long(currMS);
+ for (MetricCount metric : metricsList) {
+ if (metric.metricsName == null) {
+ logger.debug("metric.metricsName is null");
+ // Metrics is not meant to be published
+ continue;
+ }
+ long currCount = metric.count;
+ if (!metric.isPointInTime && metric.publishCount > 0
+ && currCount <= metric.prevPublishCount) {
+ // No new data added, so let's ignore it
+ logger.debug("Nothing changed. " + metric.metricsName
+ + ", currCount=" + currCount + ", prevPublishCount="
+ + metric.prevPublishCount);
+ continue;
+ }
+ metric.publishCount++;
+
+ TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+ if (timelineMetric == null) {
+ logger.debug("Creating new metric obbject for "
+ + metric.metricsName);
+ // First time for this metric
+ timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metric.metricsName);
+ timelineMetric.setHostName(nodeHostName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setStartTime(currMS);
+ timelineMetric.setType("Long");
+ timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+ metricsMap.put(metric.metricsName, timelineMetric);
+ }
+ logger.debug("Adding metrics=" + metric.metricsName);
+ if (metric.isPointInTime) {
+ timelineMetric.getMetricValues().put(currMSLong,
+ new Double(currCount));
+ } else {
+ Double value = timelineMetric.getMetricValues().get(currMSLong);
+ if (value == null) {
+ value = new Double(0);
+ }
+ value += (currCount - metric.prevPublishCount);
+ timelineMetric.getMetricValues().put(currMSLong, value);
+ metric.prevPublishCount = currCount;
+ }
+ }
+
+ if (metricsMap.size() > 0
+ && currMS - lastPublishTimeMS > publishIntervalMS) {
+ try {
+ // Time to publish
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
+ timeLineMetricList.addAll(metricsMap.values());
+ timelineMetrics.setMetrics(timeLineMetricList);
+ amsClient.emitMetrics(timelineMetrics);
+ logger.info("Published " + timeLineMetricList.size()
+ + " metrics to AMS");
+ metricsMap.clear();
+ timeLineMetricList.clear();
+ lastPublishTimeMS = currMS;
+ } catch (Throwable t) {
+ logger.warn("Error sending metrics to AMS.", t);
+ if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+ logger.error("AMS was not sent for last "
+ + maxMetricsBuffer
+ / 1000
+ + " seconds. Purging it and will start rebuilding it again");
+ metricsMap.clear();
+ lastFailedPublishTimeMS = currMS;
+ }
+ }
+ } else {
+ logger.info("Not publishing metrics. metrics.size()="
+ + metricsMap.size() + ", lastPublished="
+ + (currMS - lastPublishTimeMS) / 1000
+ + " seconds ago, intervalConfigured=" + publishIntervalMS
+ / 1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index a4e0eda..6f84251 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -24,10 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.ambari.logfeeder.ConfigBlock;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.MetricCount;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public abstract class Output extends ConfigBlock {
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index aef8dc5..18a5a54 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -26,8 +26,8 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index f711a5f..a360215 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -19,12 +19,12 @@
package org.apache.ambari.logfeeder.output;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.LogfeederHDFSUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
import org.apache.commons.lang3.StringUtils;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index a7f2321..2595d87 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -25,8 +25,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
new file mode 100644
index 0000000..0a6b7fa
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
+import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
+import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputMgr {
+ private static final Logger logger = Logger.getLogger(OutputMgr.class);
+
+ private Collection<Output> outputList = new ArrayList<Output>();
+
+ private boolean addMessageMD5 = true;
+
+ private int MAX_OUTPUT_SIZE = 32765; // 32766-1
+ private static long doc_counter = 0;
+ private MetricCount messageTruncateMetric = new MetricCount();
+
+
+ public Collection<Output> getOutputList() {
+ return outputList;
+ }
+
+ public void setOutputList(Collection<Output> outputList) {
+ this.outputList = outputList;
+ }
+
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+
+ // Update the block with the context fields
+ for (Map.Entry<String, String> entry : input.getContextFields()
+ .entrySet()) {
+ if (jsonObj.get(entry.getKey()) == null) {
+ jsonObj.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // TODO: Ideally most of the overrides should be configurable
+
+ // Add the input type
+ if (jsonObj.get("type") == null) {
+ jsonObj.put("type", input.getStringValue("type"));
+ }
+ if (jsonObj.get("path") == null && input.getFilePath() != null) {
+ jsonObj.put("path", input.getFilePath());
+ }
+ if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+ jsonObj.put("path", input.getStringValue("path"));
+ }
+
+ // Add host if required
+ if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+ jsonObj.put("host", LogFeederUtil.hostName);
+ }
+ // Add IP if required
+ if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+ jsonObj.put("ip", LogFeederUtil.ipAddress);
+ }
+
+ //Add level
+ if (jsonObj.get("level") == null) {
+ jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+ }
+ if (input.isUseEventMD5() || input.isGenEventMD5()) {
+ String prefix = "";
+ Object logtimeObj = jsonObj.get("logtime");
+ if (logtimeObj != null) {
+ if (logtimeObj instanceof Date) {
+ prefix = "" + ((Date) logtimeObj).getTime();
+ } else {
+ prefix = logtimeObj.toString();
+ }
+ }
+ Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
+ .toJson(jsonObj));
+ if (input.isGenEventMD5()) {
+ jsonObj.put("event_md5", prefix + eventMD5.toString());
+ }
+ if (input.isUseEventMD5()) {
+ jsonObj.put("id", prefix + eventMD5.toString());
+ }
+ }
+
+ // jsonObj.put("@timestamp", new Date());
+ jsonObj.put("seq_num", new Long(doc_counter++));
+ if (jsonObj.get("id") == null) {
+ jsonObj.put("id", UUID.randomUUID().toString());
+ }
+ if (jsonObj.get("event_count") == null) {
+ jsonObj.put("event_count", new Integer(1));
+ }
+ if (inputMarker.lineNumber > 0) {
+ jsonObj.put("logfile_line_number", new Integer(
+ inputMarker.lineNumber));
+ }
+ if (jsonObj.containsKey("log_message")) {
+ // TODO: Let's check size only for log_message for now
+ String logMessage = (String) jsonObj.get("log_message");
+ if (logMessage != null
+ && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+ messageTruncateMetric.count++;
+ final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
+ + "_MESSAGESIZE";
+ LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
+ "Message is too big. size="
+ + logMessage.getBytes().length + ", input="
+ + input.getShortDescription()
+ + ". Truncating to " + MAX_OUTPUT_SIZE
+ + ", first upto 100 characters="
+ + LogFeederUtil.subString(logMessage, 100),
+ null, logger, Level.WARN);
+ logMessage = new String(logMessage.getBytes(), 0,
+ MAX_OUTPUT_SIZE);
+ jsonObj.put("log_message", logMessage);
+ // Add error tags
+ @SuppressWarnings("unchecked")
+ List<String> tagsList = (List<String>) jsonObj.get("tags");
+ if (tagsList == null) {
+ tagsList = new ArrayList<String>();
+ jsonObj.put("tags", tagsList);
+ }
+ tagsList.add("error_message_truncated");
+
+ }
+ if (addMessageMD5) {
+ jsonObj.put("message_md5",
+ "" + LogFeederUtil.genHash(logMessage));
+ }
+ }
+ //check log is allowed to send output
+ if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+ for (Output output : input.getOutputList()) {
+ try {
+ output.write(jsonObj, inputMarker);
+ } catch (Exception e) {
+ logger.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ public void write(String jsonBlock, InputMarker inputMarker) {
+ //check log is allowed to send output
+ if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+ for (Output output : inputMarker.input.getOutputList()) {
+ try {
+ output.write(jsonBlock, inputMarker);
+ } catch (Exception e) {
+ logger.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ public void close() {
+ logger.info("Close called for outputs ...");
+ for (Output output : outputList) {
+ try {
+ output.setDrain(true);
+ output.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ int i;
+ boolean allClosed = true;
+ for (i = 0; i < iterations; i++) {
+ allClosed = true;
+ for (Output output : outputList) {
+ if (!output.isClosed()) {
+ try {
+ allClosed = false;
+ logger.warn("Waiting for output to close. "
+ + output.getShortDescription() + ", "
+ + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ break;
+ }
+ }
+
+ if (!allClosed) {
+ logger.warn("Some outpus were not closed. Iterations=" + i);
+ for (Output output : outputList) {
+ if (!output.isClosed()) {
+ logger.warn("Output not closed. Will ignore it."
+ + output.getShortDescription() + ", pendingCound="
+ + output.getPendingCount());
+ }
+ }
+ } else {
+ logger.info("All outputs are closed. Iterations=" + i);
+ }
+ }
+
+ public void logStats() {
+ for (Output output : outputList) {
+ output.logStat();
+ }
+ LogFeederUtil.logStatForMetric(messageTruncateMetric,
+ "Stat: Messages Truncated", null);
+ }
+
+ public void addMetricsContainers(List<MetricCount> metricsList) {
+ metricsList.add(messageTruncateMetric);
+ for (Output output : outputList) {
+ output.addMetricsContainers(metricsList);
+ }
+ }
+
+
+ public void copyFile(File inputFile, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+ for (Output output : input.getOutputList()) {
+ try {
+ output.copyFile(inputFile, inputMarker);
+ }catch (Exception e) {
+ logger.error("Error coyping file . to " + output.getShortDescription(),
+ e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index cbc1045..e95f8df 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -22,14 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
import org.apache.ambari.logfeeder.output.spool.LogSpoolerContext;
import org.apache.ambari.logfeeder.output.spool.RolloverCondition;
import org.apache.ambari.logfeeder.output.spool.RolloverHandler;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.log4j.Logger;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index b4dac72..cd9ce4d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -33,9 +33,9 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
index 1bbf33e..58282e0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -18,9 +18,9 @@
package org.apache.ambari.logfeeder.output;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index fb597d3..485b0d4 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -18,11 +18,11 @@
package org.apache.ambari.logfeeder.output;
-import org.apache.ambari.logfeeder.ConfigBlock;
-
import java.util.HashMap;
import java.util.Map;
+import org.apache.ambari.logfeeder.common.ConfigBlock;
+
/**
* Holds all configuration relevant for S3 upload.
*/
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index dec685f..fd59c51 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -19,9 +19,9 @@
package org.apache.ambari.logfeeder.output;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.s3.S3Util;
+
import org.apache.ambari.logfeeder.util.CompressionUtil;
+import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.log4j.Logger;
import java.io.File;
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
deleted file mode 100644
index d0fbb6c..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import org.apache.log4j.Logger;
-
-import com.amazonaws.AmazonServiceException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
-
-public enum AWSUtil {
- INSTANCE;
- private static final Logger LOG = Logger.getLogger(AWSUtil.class);
-
- public String getAwsUserName(String accessKey, String secretKey) {
- String username = null;
- AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
- AmazonIdentityManagementClient amazonIdentityManagementClient;
- if (awsCredentials != null) {
- amazonIdentityManagementClient = new AmazonIdentityManagementClient(
- awsCredentials);
- } else {
- // create default client
- amazonIdentityManagementClient = new AmazonIdentityManagementClient();
- }
- try {
- username = amazonIdentityManagementClient.getUser().getUser()
- .getUserName();
- } catch (AmazonServiceException e) {
- if (e.getErrorCode().compareTo("AccessDenied") == 0) {
- String arn = null;
- String msg = e.getMessage();
- int arnIdx = msg.indexOf("arn:aws");
- if (arnIdx != -1) {
- int arnSpace = msg.indexOf(" ", arnIdx);
- // should be similar to "arn:aws:iam::111111111111:user/username"
- arn = msg.substring(arnIdx, arnSpace);
- }
- if (arn != null) {
- String[] arnParts = arn.split(":");
- if (arnParts != null && arnParts.length > 5) {
- username = arnParts[5];
- if (username != null) {
- username = username.replace("user/", "");
- }
- }
- }
- }
- } catch (Exception exception) {
- LOG.error(
- "Error in getting username :" + exception.getLocalizedMessage(),
- exception.getCause());
- }
- return username;
- }
-
- public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
- if (accessKey != null && secretKey != null) {
- LOG.debug("Creating aws client as per new accesskey and secretkey");
- AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
- secretKey);
- return awsCredentials;
- } else {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
deleted file mode 100644
index db187be..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.s3;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.zip.GZIPInputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
-
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.services.s3.AmazonS3;
-import com.amazonaws.services.s3.AmazonS3Client;
-import com.amazonaws.services.s3.model.GetObjectRequest;
-import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
-
-/**
- * Utility to connect to s3
- */
-public class S3Util {
- public static final S3Util INSTANCE = new S3Util();
-
- private static final Logger LOG = Logger.getLogger(S3Util.class);
-
- public static final String S3_PATH_START_WITH = "s3://";
- public static final String S3_PATH_SEPARATOR = "/";
-
- public AmazonS3 getS3Client(String accessKey, String secretKey) {
- AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
- accessKey, secretKey);
- AmazonS3 s3client;
- if (awsCredentials != null) {
- s3client = new AmazonS3Client(awsCredentials);
- } else {
- s3client = new AmazonS3Client();
- }
- return s3client;
- }
-
- public TransferManager getTransferManager(String accessKey, String secretKey) {
- AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
- accessKey, secretKey);
- TransferManager transferManager;
- if (awsCredentials != null) {
- transferManager = new TransferManager(awsCredentials);
- } else {
- transferManager = new TransferManager();
- }
- return transferManager;
- }
-
- public void shutdownTransferManager(TransferManager transferManager) {
- if (transferManager != null) {
- transferManager.shutdownNow();
- }
- }
-
- public String getBucketName(String s3Path) {
- String bucketName = null;
- // s3path
- if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
- S3_PATH_SEPARATOR);
- bucketName = s3PathParts[0];
- }
- return bucketName;
- }
-
- public String getS3Key(String s3Path) {
- StringBuilder s3Key = new StringBuilder();
- // s3path
- if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
- S3_PATH_SEPARATOR);
- ArrayList<String> s3PathList = new ArrayList<String>(
- Arrays.asList(s3PathParts));
- s3PathList.remove(0);// remove bucketName
- for (int index = 0; index < s3PathList.size(); index++) {
- if (index > 0) {
- s3Key.append(S3_PATH_SEPARATOR);
- }
- s3Key.append(s3PathList.get(index));
- }
- }
- return s3Key.toString();
- }
-
- public void uploadFileTos3(String bucketName, String s3Key, File localFile,
- String accessKey, String secretKey) {
- TransferManager transferManager = getTransferManager(accessKey, secretKey);
- try {
- Upload upload = transferManager.upload(bucketName, s3Key, localFile);
- upload.waitForUploadResult();
- } catch (AmazonClientException | InterruptedException e) {
- LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
- e);
- } finally {
- shutdownTransferManager(transferManager);
- }
- }
-
- /**
- * Get the buffer reader to read s3 file as a stream
- */
- public BufferedReader getReader(String s3Path, String accessKey,
- String secretKey) throws IOException {
- // TODO error handling
- // Compression support
- // read header and decide the compression(auto detection)
- // For now hard-code GZIP compression
- String s3Bucket = getBucketName(s3Path);
- String s3Key = getS3Key(s3Path);
- S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
- new GetObjectRequest(s3Bucket, s3Key));
- GZIPInputStream objectInputStream;
- try {
- objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
- objectInputStream));
- return bufferedReader;
- } catch (IOException e) {
- LOG.error("Error in creating stream reader for s3 file :" + s3Path,
- e.getCause());
- throw e;
- }
- }
-
- public void writeIntoS3File(String data, String bucketName, String s3Key,
- String accessKey, String secretKey) {
- InputStream in = null;
- try {
- in = IOUtils.toInputStream(data, "UTF-8");
- } catch (IOException e) {
- LOG.error(e);
- }
- if (in != null) {
- TransferManager transferManager = getTransferManager(accessKey, secretKey);
- try {
- if (transferManager != null) {
- transferManager.upload(
- new PutObjectRequest(bucketName, s3Key, in,
- new ObjectMetadata())).waitForUploadResult();
- LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
- + bucketName);
- }
- } catch (AmazonClientException | InterruptedException e) {
- LOG.error(e);
- } finally {
- try {
- shutdownTransferManager(transferManager);
- in.close();
- } catch (IOException e) {
- // ignore
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
new file mode 100644
index 0000000..15f7594
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import org.apache.log4j.Logger;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
+
+public enum AWSUtil {
+ INSTANCE;
+ private static final Logger LOG = Logger.getLogger(AWSUtil.class);
+
+ public String getAwsUserName(String accessKey, String secretKey) {
+ String username = null;
+ AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
+ AmazonIdentityManagementClient amazonIdentityManagementClient;
+ if (awsCredentials != null) {
+ amazonIdentityManagementClient = new AmazonIdentityManagementClient(
+ awsCredentials);
+ } else {
+ // create default client
+ amazonIdentityManagementClient = new AmazonIdentityManagementClient();
+ }
+ try {
+ username = amazonIdentityManagementClient.getUser().getUser()
+ .getUserName();
+ } catch (AmazonServiceException e) {
+ if (e.getErrorCode().compareTo("AccessDenied") == 0) {
+ String arn = null;
+ String msg = e.getMessage();
+ int arnIdx = msg.indexOf("arn:aws");
+ if (arnIdx != -1) {
+ int arnSpace = msg.indexOf(" ", arnIdx);
+ // should be similar to "arn:aws:iam::111111111111:user/username"
+ arn = msg.substring(arnIdx, arnSpace);
+ }
+ if (arn != null) {
+ String[] arnParts = arn.split(":");
+ if (arnParts != null && arnParts.length > 5) {
+ username = arnParts[5];
+ if (username != null) {
+ username = username.replace("user/", "");
+ }
+ }
+ }
+ }
+ } catch (Exception exception) {
+ LOG.error(
+ "Error in getting username :" + exception.getLocalizedMessage(),
+ exception.getCause());
+ }
+ return username;
+ }
+
+ public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+ if (accessKey != null && secretKey != null) {
+ LOG.debug("Creating aws client as per new accesskey and secretkey");
+ AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
+ secretKey);
+ return awsCredentials;
+ } else {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/6a5eda95/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
new file mode 100644
index 0000000..a92ba29
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.util;
+
+import java.io.File;
+import java.util.HashMap;
+
+import org.apache.log4j.Logger;
+
+public class AliasUtil {
+
+ private static Logger logger = Logger.getLogger(AliasUtil.class);
+
+ private static AliasUtil instance = null;
+
+ private static String aliasConfigJson = "alias_config.json";
+
+ private HashMap<String, Object> aliasMap = null;
+
+ public static enum ALIAS_TYPE {
+ INPUT, FILTER, MAPPER, OUTPUT
+ }
+
+ public static enum ALIAS_PARAM {
+ KLASS
+ }
+
+ private AliasUtil() {
+ init();
+ }
+
+ public static AliasUtil getInstance() {
+ if (instance == null) {
+ synchronized (AliasUtil.class) {
+ if (instance == null) {
+ instance = new AliasUtil();
+ }
+ }
+ }
+ return instance;
+ }
+
+ /**
+ */
+ private void init() {
+ File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson);
+ if (jsonFile != null) {
+ this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile);
+ }
+
+ }
+
+
+ public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) {
+ String result = key;// key as a default value;
+ HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
+ String value = aliasInfo.get(aliasParam.name().toLowerCase());
+ if (value != null && !value.isEmpty()) {
+ result = value;
+ logger.debug("Alias found for key :" + key + ", param :" + aliasParam.name().toLowerCase() + ", value :"
+ + value + " aliastype:" + aliastype.name());
+ } else {
+ logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase());
+ }
+ return result;
+ }
+
+ @SuppressWarnings("unchecked")
+ private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) {
+ HashMap<String, String> aliasInfo = null;
+ if (aliasMap != null) {
+ String typeKey = aliastype.name().toLowerCase();
+ HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
+ if (typeJson != null) {
+ aliasInfo = (HashMap<String, String>) typeJson.get(key);
+ }
+ }
+ if (aliasInfo == null) {
+ aliasInfo = new HashMap<String, String>();
+ }
+ return aliasInfo;
+ }
+}