You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/06/27 10:31:45 UTC
[2/2] ambari git commit: AMBARI-17347. Remove superfluous Logfeeder
codes (Miklos Gergely via oleewere)
AMBARI-17347. Remove superfluous Logfeeder codes (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4d948256
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4d948256
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4d948256
Branch: refs/heads/trunk
Commit: 4d94825658075a9db64f62fc821eca05fe968b0a
Parents: 3783caf
Author: Miklos Gergely <mg...@hortonworks.com>
Authored: Mon Jun 27 12:18:41 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Mon Jun 27 12:18:41 2016 +0200
----------------------------------------------------------------------
.../org/apache/ambari/logfeeder/AliasUtil.java | 11 ----
.../apache/ambari/logfeeder/ConfigBlock.java | 15 ------
.../org/apache/ambari/logfeeder/InputMgr.java | 25 +--------
.../org/apache/ambari/logfeeder/LogFeeder.java | 28 -----------
.../ambari/logfeeder/LogFeederAMSClient.java | 15 ------
.../apache/ambari/logfeeder/LogFeederUtil.java | 24 ---------
.../apache/ambari/logfeeder/MetricCount.java | 4 +-
.../org/apache/ambari/logfeeder/MetricsMgr.java | 10 +---
.../org/apache/ambari/logfeeder/OutputMgr.java | 14 ------
.../apache/ambari/logfeeder/filter/Filter.java | 16 ------
.../ambari/logfeeder/filter/FilterGrok.java | 38 +-------------
.../ambari/logfeeder/filter/FilterKeyValue.java | 11 ----
.../ambari/logfeeder/filter/JSONFilterCode.java | 7 +--
.../apache/ambari/logfeeder/input/Input.java | 24 ---------
.../ambari/logfeeder/input/InputFile.java | 47 ++---------------
.../ambari/logfeeder/input/InputMarker.java | 3 --
.../ambari/logfeeder/input/InputS3File.java | 36 ++-----------
.../logfeeder/input/reader/GZIPReader.java | 3 --
.../input/reader/LogsearchReaderFactory.java | 5 --
.../logconfig/FetchConfigFromSolr.java | 2 -
.../logfeeder/logconfig/LogFeederConstants.java | 6 ---
.../logconfig/filter/ApplyLogFilter.java | 3 --
.../logconfig/filter/DefaultDataFilter.java | 4 --
.../apache/ambari/logfeeder/mapper/Mapper.java | 5 --
.../ambari/logfeeder/mapper/MapperDate.java | 2 -
.../logfeeder/mapper/MapperFieldName.java | 3 --
.../logfeeder/mapper/MapperFieldValue.java | 1 -
.../apache/ambari/logfeeder/output/Output.java | 16 ------
.../ambari/logfeeder/output/OutputData.java | 4 --
.../ambari/logfeeder/output/OutputDevNull.java | 2 -
.../ambari/logfeeder/output/OutputFile.java | 12 -----
.../ambari/logfeeder/output/OutputKafka.java | 15 +-----
.../ambari/logfeeder/output/OutputS3File.java | 23 +--------
.../ambari/logfeeder/output/OutputSolr.java | 9 +---
.../org/apache/ambari/logfeeder/s3/AWSUtil.java | 8 ---
.../org/apache/ambari/logfeeder/s3/S3Util.java | 53 ++------------------
.../ambari/logfeeder/util/CompressionUtil.java | 8 ---
.../ambari/logfeeder/util/PlaceholderUtil.java | 18 -------
.../apache/ambari/logfeeder/util/SolrUtil.java | 17 -------
.../apache/ambari/logfeeder/s3/AWSUtilTest.java | 5 +-
.../apache/ambari/logfeeder/s3/S3UtilTest.java | 6 ---
.../logfeeder/util/PlaceholderUtilTest.java | 4 --
42 files changed, 23 insertions(+), 539 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
index 3836c76..c3b36df 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/AliasUtil.java
@@ -41,16 +41,10 @@ public class AliasUtil {
KLASS
}
- /**
- *
- */
private AliasUtil() {
init();
}
- /**
- * @return
- */
public static AliasUtil getInstance() {
if (instance == null) {
synchronized (AliasUtil.class) {
@@ -73,11 +67,6 @@ public class AliasUtil {
}
- /**
- * @param key
- * @param aliastype
- * @return
- */
public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) {
String result = key;// key as a default value;
HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
index 521319e..088472e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/ConfigBlock.java
@@ -45,15 +45,11 @@ public abstract class ConfigBlock {
/**
* Used while logging. Keep it short and meaningful
- *
- * @return
*/
public abstract String getShortDescription();
/**
* Every implementor need to give name to the thread they create
- *
- * @return
*/
public String getNameForThread() {
return this.getClass().getSimpleName();
@@ -68,20 +64,13 @@ public abstract class ConfigBlock {
/**
* This method needs to be overwritten by deriving classes.
- *
- * @throws Exception
*/
public void init() throws Exception {
-
}
- /**
- * @param map
- */
public void loadConfig(Map<String, Object> map) {
configs = LogFeederUtil.cloneObject(map);
- // Extract fields from config block
Map<String, String> nvList = getNVList("add_fields");
if (nvList != null) {
contextFields.putAll(nvList);
@@ -156,10 +145,6 @@ public abstract class ConfigBlock {
return allow;
}
- /**
- * @param string
- * @return
- */
@SuppressWarnings("unchecked")
public Map<String, String> getNVList(String key) {
return (Map<String, String>) configs.get(key);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
index 4359c78..c64a007 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/InputMgr.java
@@ -76,9 +76,6 @@ public class InputMgr {
inputList.add(input);
}
- /**
- * @param input
- */
public void removeInput(Input input) {
logger.info("Trying to remove from inputList. "
+ input.getShortDescription());
@@ -93,9 +90,6 @@ public class InputMgr {
}
}
- /**
- * @return
- */
public int getActiveFilesCount() {
int count = 0;
for (Input input : inputList) {
@@ -273,9 +267,6 @@ public class InputMgr {
metricsList.add(filesCountMetric);
}
- /**
- *
- */
public void logStats() {
for (Input input : inputList) {
input.logStat();
@@ -352,9 +343,7 @@ public class InputMgr {
logger.info("Cleaning checkPoint files. checkPointFolderFile="
+ checkPointFolderFile.getAbsolutePath());
try {
- // Loop over the check point files and if filePath is not present,
- // then
- // move to closed
+ // Loop over the check point files and if filePath is not present, then move to closed
String searchPath = "*" + checkPointExtension;
FileFilter fileFilter = new WildcardFileFilter(searchPath);
File[] checkPointFiles = checkPointFolderFile.listFiles(fileFilter);
@@ -473,9 +462,7 @@ public class InputMgr {
} else {
logger.info("Configuring to monitor folder "
+ folderToWatch + " for file " + fileToMonitor);
- // get the directory we want to watch, using the Paths
- // singleton
- // class
+ // get the directory we want to watch, using the Paths singleton class
Path toWatch = Paths.get(folderToWatch.getAbsolutePath());
if (toWatch == null) {
throw new UnsupportedOperationException(
@@ -498,11 +485,6 @@ public class InputMgr {
}
class FileSystemMonitor implements Runnable {
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
@Override
public void run() {
try {
@@ -544,9 +526,6 @@ public class InputMgr {
}
- /**
- *
- */
public void waitOnAllInputs() {
//wait on inputs
if (inputList != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
index d00ed67..c5d4fd5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeeder.java
@@ -51,7 +51,6 @@ import com.google.gson.reflect.TypeToken;
public class LogFeeder {
static Logger logger = Logger.getLogger(LogFeeder.class);
- // List<Input> inputList = new ArrayList<Input>();
Collection<Output> outputList = new ArrayList<Output>();
OutputMgr outMgr = new OutputMgr();
@@ -79,11 +78,8 @@ public class LogFeeder {
public void init() throws Throwable {
- // Load properties
LogFeederUtil.loadProperties("logfeeder.properties", inputParams);
- // loop the properties and load them
- // Load the configs
String configFiles = LogFeederUtil.getStringProperty("logfeeder.config.files");
logger.info("logfeeder.config.files=" + configFiles);
@@ -129,7 +125,6 @@ public class LogFeeder {
}
inputMgr.init();
metricsMgr.init();
- //starting timer to fetch config from solr
logger.debug("==============");
}
@@ -149,10 +144,6 @@ public class LogFeeder {
/**
* This method loads the configurations from the given file.
- *
- * @param configFile
- * @return
- * @throws Exception
*/
void loadConfigsUsingFile(File configFile) throws Exception {
FileInputStream fileInputStream = null;
@@ -205,13 +196,9 @@ public class LogFeeder {
}
- /**
- *
- */
private void mergeAllConfigs() {
globalMap = mergeConfigs(globalConfigList);
- // Sort the filter blocks
sortBlocks(filterConfigList);
// First loop for output
for (Map<String, Object> map : outputConfigList) {
@@ -365,10 +352,6 @@ public class LogFeeder {
}
}
- /**
- * @param filterConfigList2
- * @return
- */
private void sortBlocks(List<Map<String, Object>> blockList) {
Collections.sort(blockList, new Comparator<Map<String, Object>>() {
@@ -414,9 +397,6 @@ public class LogFeeder {
});
}
- /**
- * @param globalConfigList2
- */
private Map<String, Object> mergeConfigs(
List<Map<String, Object>> configList) {
Map<String, Object> mergedConfig = new HashMap<String, Object>();
@@ -518,11 +498,6 @@ public class LogFeeder {
}
}
- /**
- * @param inFile
- * @return
- * @throws Throwable
- */
public String readFile(BufferedReader br) throws Exception {
try {
StringBuilder sb = new StringBuilder();
@@ -595,15 +570,12 @@ public class LogFeeder {
public void waitOnAllDaemonThreads() {
String foreground = LogFeederUtil.getStringProperty("foreground");
if (foreground != null && foreground.equalsIgnoreCase("true")) {
- // wait on inputmgr daemon threads
inputMgr.waitOnAllInputs();
- // set isLogfeederCompleted to true to stop statLoggerThread
isLogfeederCompleted = true;
if (statLoggerThread != null) {
try {
statLoggerThread.join();
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
index 6f29fde..1b9171b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java
@@ -41,26 +41,11 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
logger.info("AMS collector URL=" + collectorHosts);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
- * getCollectorUri()
- */
@Override
public String getCollectorUri(String host) {
-
return collectorHosts;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink#
- * getTimeoutSeconds()
- */
@Override
protected int getTimeoutSeconds() {
// TODO: Hard coded timeout
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
index 7a68b4d..78d0499 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederUtil.java
@@ -80,7 +80,6 @@ public class LogFeederUtil {
public static final Object _LOCK = new Object();
static{
- //set hostname and hostIp
setHostNameAndIP();
}
@@ -91,10 +90,6 @@ public class LogFeederUtil {
/**
* This method will read the properties from System, followed by propFile
* and finally from the map
- *
- * @param propFile
- * @param propNVList
- * @throws Exception
*/
static public void loadProperties(String propFile, String[] propNVList)
throws Exception {
@@ -151,14 +146,10 @@ public class LogFeederUtil {
logger.fatal("Properties file is not loaded.");
throw new Exception("Properties not loaded");
} else {
- // Let's load properties from argument list
updatePropertiesFromMap(propNVList);
}
}
- /**
- * @param nvList
- */
private static void updatePropertiesFromMap(String[] nvList) {
if (nvList == null) {
return;
@@ -241,10 +232,6 @@ public class LogFeederUtil {
return retValue;
}
- static public boolean isEnabled(Map<String, Object> configs) {
- return isEnabled(configs, configs);
- }
-
static public boolean isEnabled(Map<String, Object> conditionConfigs,
Map<String, Object> valueConfigs) {
boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
@@ -325,18 +312,11 @@ public class LogFeederUtil {
metric.prevLogMS = currMS;
}
- static public void logCountForMetric(MetricCount metric, String prefixStr,
- String postFix) {
- logger.info(prefixStr + ": count=" + metric.count + postFix);
- }
-
public static Map<String, Object> cloneObject(Map<String, Object> map) {
if (map == null) {
return null;
}
String jsonStr = gson.toJson(map);
- // We need to clone it, so we will create a JSON string and convert it
- // back
Type type = new TypeToken<Map<String, Object>>() {
}.getType();
return gson.fromJson(jsonStr, type);
@@ -458,10 +438,6 @@ public class LogFeederUtil {
return instance;
}
- /**
- * @param fileName
- * @return
- */
public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
ObjectMapper mapper = new ObjectMapper();
try {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
index c715881..9bb1564 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricCount.java
@@ -27,7 +27,5 @@ public class MetricCount {
public long prevLogCount = 0;
public long prevLogMS = System.currentTimeMillis();
public long prevPublishCount = 0;
- public long prevPublishMS = 0; // We will try to publish one immediately
- public int publishCount = 0; // Count of published metrics. Used for first
- // time sending metrics
+ public int publishCount = 0; // Count of published metrics. Used for first time sending metrics
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
index be7594a..4a8f7d0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/MetricsMgr.java
@@ -37,8 +37,7 @@ public class MetricsMgr {
String appId = "logfeeder";
long lastPublishTimeMS = 0; // Let's do the first publish immediately
- long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the
- // clock
+ long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
int publishIntervalMS = 60 * 1000;
int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
@@ -84,16 +83,10 @@ public class MetricsMgr {
}
}
- /**
- * @return
- */
public boolean isMetricsEnabled() {
return isMetricsEnabled;
}
- /**
- * @param metricsList
- */
synchronized public void useMetrics(List<MetricCount> metricsList) {
if (!isMetricsEnabled) {
return;
@@ -145,7 +138,6 @@ public class MetricsMgr {
value += (currCount - metric.prevPublishCount);
timelineMetric.getMetricValues().put(currMSLong, value);
metric.prevPublishCount = currCount;
- metric.prevPublishMS = currMS;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
index f84457e..f6d3481 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/OutputMgr.java
@@ -54,11 +54,6 @@ public class OutputMgr {
this.outputList = outputList;
}
- /**
- * @param jsonObj
- * @param inputStr
- * @param input
- */
public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
Input input = inputMarker.input;
@@ -183,9 +178,6 @@ public class OutputMgr {
}
}
- /**
- * Close all the outputs
- */
public void close() {
logger.info("Close called for outputs ...");
for (Output output : outputList) {
@@ -235,9 +227,6 @@ public class OutputMgr {
}
}
- /**
- *
- */
public void logStats() {
for (Output output : outputList) {
output.logStat();
@@ -246,9 +235,6 @@ public class OutputMgr {
"Stat: Messages Truncated", null);
}
- /**
- * @param metricsList
- */
public void addMetricsContainers(List<MetricCount> metricsList) {
metricsList.add(messageTruncateMetric);
for (Output output : outputList) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
index aa1edea..d34eed6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/Filter.java
@@ -56,12 +56,8 @@ public abstract class Filter extends ConfigBlock {
}
}
- /**
- *
- */
@SuppressWarnings("unchecked")
protected void initializePostMapValues() {
- // Initialize map values
Map<String, Object> postMapValues = (Map<String, Object>) getConfigValue("post_map_values");
if (postMapValues == null) {
return;
@@ -97,10 +93,6 @@ public abstract class Filter extends ConfigBlock {
}
}
- /**
- * @param mapClassCode
- * @return
- */
protected Mapper getMapper(String mapClassCode) {
String classFullName = AliasUtil.getInstance().readAlias(mapClassCode, ALIAS_TYPE.MAPPER, ALIAS_PARAM.KLASS);
if (classFullName != null && !classFullName.isEmpty()) {
@@ -132,9 +124,6 @@ public abstract class Filter extends ConfigBlock {
/**
* Deriving classes should implement this at the minimum
- *
- * @param inputStr
- * @param marker
*/
public void apply(String inputStr, InputMarker inputMarker) {
// TODO: There is no transformation for string types.
@@ -163,9 +152,6 @@ public abstract class Filter extends ConfigBlock {
}
}
- /**
- *
- */
public void close() {
if (nextFilter != null) {
nextFilter.close();
@@ -187,7 +173,6 @@ public abstract class Filter extends ConfigBlock {
@Override
public boolean isFieldConditionMatch(String fieldName, String stringValue) {
if (!super.isFieldConditionMatch(fieldName, stringValue)) {
- // Let's try input
if (input != null) {
return input.isFieldConditionMatch(fieldName, stringValue);
} else {
@@ -199,7 +184,6 @@ public abstract class Filter extends ConfigBlock {
@Override
public String getShortDescription() {
- // TODO Auto-generated method stub
return null;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
index 9b2a717..31e1cd1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterGrok.java
@@ -77,7 +77,6 @@ public class FilterGrok extends Filter {
try {
grokErrorMetric.metricsName = "filter.error.grok";
- // Get the Grok file patterns
messagePattern = escapePattern(getStringValue("message_pattern"));
multilinePattern = escapePattern(getStringValue("multiline_pattern"));
sourceField = getStringValue("source_field");
@@ -94,7 +93,6 @@ public class FilterGrok extends Filter {
extractNamedParams(messagePattern, namedParamList);
grokMessage = new Grok();
- // grokMessage.addPatternFromReader(r);
loadPatterns(grokMessage);
grokMessage.compile(messagePattern);
if (!StringUtils.isEmpty(multilinePattern)) {
@@ -115,20 +113,12 @@ public class FilterGrok extends Filter {
}
- /**
- * @param stringValue
- * @return
- */
private String escapePattern(String inPattern) {
String inStr = inPattern;
if (inStr != null) {
if (inStr.contains("(?m)") && !inStr.contains("(?s)")) {
inStr = inStr.replaceFirst("(?m)", "(?s)");
}
- // inStr = inStr.replaceAll("\\[", "\\\\[");
- // inStr = inStr.replaceAll("\\]", "\\\\]");
- // inStr = inStr.replaceAll("\\(", "\\\\(");
- // inStr = inStr.replaceAll("\\)", "\\\\)");
}
return inStr;
}
@@ -178,11 +168,6 @@ public class FilterGrok extends Filter {
return true;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
- */
@Override
public void apply(String inputStr, InputMarker inputMarker) {
if (grokMessage == null) {
@@ -190,20 +175,13 @@ public class FilterGrok extends Filter {
}
if (grokMultiline != null) {
- // Check if new line
String jsonStr = grokMultiline.capture(inputStr);
if (!"{}".equals(jsonStr)) {
- // New line
if (strBuff != null) {
- savedInputMarker.beginLineNumber = firstInputMarker.lineNumber;
- // Construct JSON object and add only the interested named
- // parameters
Map<String, Object> jsonObj = Collections
.synchronizedMap(new HashMap<String, Object>());
try {
- // Handle message parsing
- applyMessage(strBuff.toString(), jsonObj,
- currMultilineJsonStr);
+ applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
} finally {
strBuff = null;
savedInputMarker = null;
@@ -217,7 +195,6 @@ public class FilterGrok extends Filter {
strBuff = new StringBuilder();
firstInputMarker = inputMarker;
} else {
- // strBuff.append(System.lineSeparator());
strBuff.append('\r');
strBuff.append('\n');
}
@@ -253,7 +230,6 @@ public class FilterGrok extends Filter {
boolean parseError = false;
if ("{}".equals(jsonStr)) {
parseError = true;
- // Error parsing string.
logParseError(inputStr);
if (multilineJsonStr == null) {
@@ -273,7 +249,6 @@ public class FilterGrok extends Filter {
}
}
if (parseError) {
- // Add error tags
@SuppressWarnings("unchecked")
List<String> tagsList = (List<String>) jsonObj.get("tags");
if (tagsList == null) {
@@ -282,8 +257,7 @@ public class FilterGrok extends Filter {
}
tagsList.add("error_grok_parsing");
if (sourceField == null) {
- // For now let's put the raw message in log_message, so it is
- // will be searchable
+ // For now let's put the raw message in log_message, so it is will be searchable
jsonObj.put("log_message", inputStr);
}
}
@@ -314,7 +288,6 @@ public class FilterGrok extends Filter {
@Override
public void flush() {
if (strBuff != null) {
- // Handle message parsing
Map<String, Object> jsonObj = Collections
.synchronizedMap(new HashMap<String, Object>());
applyMessage(strBuff.toString(), jsonObj, currMultilineJsonStr);
@@ -324,11 +297,6 @@ public class FilterGrok extends Filter {
super.flush();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
- */
@Override
public String getShortDescription() {
return "filter:filter=grok,regex=" + messagePattern;
@@ -343,9 +311,7 @@ public class FilterGrok extends Filter {
@Override
public void logStat() {
super.logStat();
- // Printing stat for grokErrors
logStatForMetric(grokErrorMetric, "Stat: Grok Errors");
-
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
index c4da3cb..7b1e5e0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/FilterKeyValue.java
@@ -58,11 +58,6 @@ public class FilterKeyValue extends Filter {
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.filter.Filter#apply(java.lang.String)
- */
@Override
public void apply(String inputStr, InputMarker inputMarker) {
apply(LogFeederUtil.toJSONObject(inputStr), inputMarker);
@@ -87,7 +82,6 @@ public class FilterKeyValue extends Filter {
String value = nvTokenizer.nextToken();
jsonObj.put(name, value);
} else {
- // Unbalanced name value pairs
logParseError("name=" + name + ", pair=" + nv
+ ", field=" + sourceField + ", field_value="
+ valueObj);
@@ -113,11 +107,6 @@ public class FilterKeyValue extends Filter {
Level.ERROR);
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
- */
@Override
public String getShortDescription() {
return "filter:filter=keyvalue,regex=" + sourceField;
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
index 5c4d30e..6afda63 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/filter/JSONFilterCode.java
@@ -22,22 +22,19 @@ import java.util.Map;
import org.apache.ambari.logfeeder.LogFeederUtil;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.log4j.Logger;
-
public class JSONFilterCode extends Filter {
- private static Logger logger = Logger.getLogger(JSONFilterCode.class);
@Override
public void apply(String inputStr, InputMarker inputMarker) {
Map<String, Object> jsonMap = LogFeederUtil.toJSONObject(inputStr);
- // linenumber
+
Double lineNumberD = (Double) jsonMap.get("line_number");
if (lineNumberD != null) {
long lineNumber = lineNumberD.longValue();
jsonMap.put("line_number", lineNumber);
}
- // logtime
+
String timeStampStr = (String) jsonMap.get("logtime");
if (timeStampStr != null && !timeStampStr.isEmpty()) {
String logtime = LogFeederUtil.getDate(timeStampStr);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
index 18e2184..2e38255 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/Input.java
@@ -58,8 +58,6 @@ public abstract class Input extends ConfigBlock implements Runnable {
* This method will be called from the thread spawned for the output. This
* method should only exit after all data are read from the source or the
* process is exiting
- *
- * @throws Exception
*/
abstract void start() throws Exception;
@@ -88,11 +86,6 @@ public abstract class Input extends ConfigBlock implements Runnable {
return super.getNameForThread() + ":" + type;
}
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
@Override
public void run() {
try {
@@ -117,9 +110,6 @@ public abstract class Input extends ConfigBlock implements Runnable {
}
}
- /**
- *
- */
public void flush() {
if (firstFilter != null) {
firstFilter.flush();
@@ -145,12 +135,8 @@ public abstract class Input extends ConfigBlock implements Runnable {
* This is generally used by final checkin
*/
public void checkIn() {
-
}
- /**
- * @return
- */
public boolean isReady() {
return true;
}
@@ -258,7 +244,6 @@ public abstract class Input extends ConfigBlock implements Runnable {
@Override
public String getShortDescription() {
- // TODO Auto-generated method stub
return null;
}
@@ -277,9 +262,6 @@ public abstract class Input extends ConfigBlock implements Runnable {
return getShortDescription();
}
- /**
- *
- */
public void rollOver() {
// Only some inputs support it. E.g. InputFile
}
@@ -300,16 +282,10 @@ public abstract class Input extends ConfigBlock implements Runnable {
return outputList;
}
- /**
- * @param output
- */
public void addOutput(Output output) {
outputList.add(output);
}
- /**
- * @param metricsList
- */
public void addMetricsContainers(List<MetricCount> metricsList) {
super.addMetricsContainers(metricsList);
if (firstFilter != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
index 7107a69..3538ba0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputFile.java
@@ -44,7 +44,6 @@ import org.apache.solr.common.util.Base64;
public class InputFile extends Input {
static private Logger logger = Logger.getLogger(InputFile.class);
- // String startPosition = "beginning";
String logPath = null;
boolean isStartFromBegining = true;
@@ -110,11 +109,6 @@ public class InputFile extends Input {
super.init();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#isReady()
- */
@Override
public boolean isReady() {
if (!isReady) {
@@ -244,11 +238,6 @@ public class InputFile extends Input {
isRolledOver = true;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#monitor()
- */
@Override
void start() throws Exception {
@@ -258,7 +247,6 @@ public class InputFile extends Input {
boolean isProcessFile = getBooleanValue("process_file", true);
if (isProcessFile) {
if (isTail()) {
- // Just process the first file
processFile(logPathFiles[0]);
} else {
for (File file : logPathFiles) {
@@ -273,11 +261,8 @@ public class InputFile extends Input {
}
}
}
- // Call the close for the input. Which should flush to the filters and
- // output
close();
}else{
- //copy files
copyFiles(logPathFiles);
}
@@ -304,15 +289,12 @@ public class InputFile extends Input {
int lineCount = 0;
try {
setFilePath(logPathFile.getAbsolutePath());
-// br = new BufferedReader(new FileReader(logPathFile));
br = new BufferedReader(LogsearchReaderFactory.INSTANCE.getReader(logPathFile));
// Whether to send to output from the beginning.
boolean resume = isStartFromBegining;
- // Seems FileWatch is not reliable, so let's only use file key
- // comparison
- // inputMgr.monitorSystemFileChanges(this);
+ // Seems FileWatch is not reliable, so let's only use file key comparison
fileKey = getFileKey(logPathFile);
base64FileKey = Base64.byteArrayToBase64(fileKey.toString()
.getBytes());
@@ -321,7 +303,6 @@ public class InputFile extends Input {
if (isTail()) {
try {
- // Let's see if there is a checkpoint for this file
logger.info("Checking existing checkpoint file. "
+ getShortDescription());
@@ -348,9 +329,7 @@ public class InputFile extends Input {
+ checkPointFile
+ ", input=" + getShortDescription());
} else {
- // Create JSON string
- String jsonCheckPointStr = new String(b, 0,
- readSize);
+ String jsonCheckPointStr = new String(b, 0, readSize);
jsonCheckPoint = LogFeederUtil
.toJSONObject(jsonCheckPointStr);
@@ -405,8 +384,7 @@ public class InputFile extends Input {
}
sleepIteration++;
try {
- // Since FileWatch service is not reliable, we will
- // check
+ // Since FileWatch service is not reliable, we will check
// file inode every n seconds after no write
if (sleepIteration > 4) {
Object newFileKey = getFileKey(logPathFile);
@@ -448,11 +426,8 @@ public class InputFile extends Input {
break;
}
try {
- // Open new file
logger.info("Opening new rolled over file."
+ getShortDescription());
-// br = new BufferedReader(new FileReader(
-// logPathFile));
br = new BufferedReader(LogsearchReaderFactory.
INSTANCE.getReader(logPathFile));
lineCount = 0;
@@ -466,9 +441,7 @@ public class InputFile extends Input {
} catch (Exception ex) {
logger.error("Error opening rolled over file. "
+ getShortDescription());
- // Let's add this to monitoring and exit
- // this
- // thread
+ // Let's add this to monitoring and exit this thread
logger.info("Added input to not ready list."
+ getShortDescription());
isReady = false;
@@ -501,9 +474,7 @@ public class InputFile extends Input {
}
if (resume) {
InputMarker marker = new InputMarker();
- marker.fileKey = fileKey;
marker.base64FileKey = base64FileKey;
- marker.filePath = filePath;
marker.input = this;
marker.lineNumber = lineCount;
outputLine(line, marker);
@@ -533,10 +504,6 @@ public class InputFile extends Input {
}
}
- /**
- * @param logPathFile2
- * @return
- */
static public Object getFileKey(File file) {
try {
Path fileFullPath = Paths.get(file.getAbsolutePath());
@@ -551,11 +518,6 @@ public class InputFile extends Input {
return file.toString();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
- */
@Override
public String getShortDescription() {
return "input:source="
@@ -565,7 +527,6 @@ public class InputFile extends Input {
.getAbsolutePath() : getStringValue("path"));
}
-
public void copyFiles(File[] files) {
boolean isCopyFile = getBooleanValue("copy_file", false);
if (isCopyFile && files != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
index 6196068..8def4b9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputMarker.java
@@ -24,10 +24,7 @@ package org.apache.ambari.logfeeder.input;
*/
public class InputMarker {
public int lineNumber = 0;
- public int beginLineNumber = 0;
public Input input;
- public String filePath;
- public Object fileKey = null;
public String base64FileKey = null;
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
index d68ab96..9d5f970 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/InputS3File.java
@@ -102,11 +102,6 @@ public class InputS3File extends Input {
super.init();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#isReady()
- */
@Override
public boolean isReady() {
if (!isReady) {
@@ -201,11 +196,6 @@ public class InputS3File extends Input {
isRolledOver = true;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#monitor()
- */
@Override
void start() throws Exception {
if (s3LogPathFiles == null || s3LogPathFiles.length == 0) {
@@ -213,7 +203,6 @@ public class InputS3File extends Input {
}
if (isTail()) {
- // Just process the first file
processFile(s3LogPathFiles[0]);
} else {
for (String s3FilePath : s3LogPathFiles) {
@@ -228,8 +217,6 @@ public class InputS3File extends Input {
}
}
}
- // Call the close for the input. Which should flush to the filters and
- // output
close();
}
@@ -265,9 +252,7 @@ public class InputS3File extends Input {
// Whether to send to output from the beginning.
boolean resume = isStartFromBegining;
- // Seems FileWatch is not reliable, so let's only use file key
- // comparison
- // inputMgr.monitorSystemFileChanges(this);
+ // Seems FileWatch is not reliable, so let's only use file key comparison
fileKey = getFileKey(logPathFile);
base64FileKey = Base64.byteArrayToBase64(fileKey.toString().getBytes());
logger.info("fileKey=" + fileKey + ", base64=" + base64FileKey + ". "
@@ -301,7 +286,6 @@ public class InputS3File extends Input {
+ ", input="
+ getShortDescription());
} else {
- // Create JSON string
String jsonCheckPointStr = new String(b, 0, readSize);
jsonCheckPoint = LogFeederUtil.toJSONObject(jsonCheckPointStr);
@@ -352,8 +336,7 @@ public class InputS3File extends Input {
}
sleepIteration++;
try {
- // Since FileWatch service is not reliable, we will
- // check
+ // Since FileWatch service is not reliable, we will check
// file inode every n seconds after no write
if (sleepIteration > 4) {
Object newFileKey = getFileKey(logPathFile);
@@ -407,9 +390,7 @@ public class InputS3File extends Input {
} catch (Exception ex) {
logger.error("Error opening rolled over file. "
+ getShortDescription());
- // Let's add this to monitoring and exit
- // this
- // thread
+ // Let's add this to monitoring and exit this thread
logger.info("Added input to not ready list."
+ getShortDescription());
isReady = false;
@@ -439,9 +420,7 @@ public class InputS3File extends Input {
}
if (resume) {
InputMarker marker = new InputMarker();
- marker.fileKey = fileKey;
marker.base64FileKey = base64FileKey;
- marker.filePath = filePath;
marker.input = this;
marker.lineNumber = lineCount;
outputLine(line, marker);
@@ -469,19 +448,10 @@ public class InputS3File extends Input {
}
}
- /**
- * @param s3FilePath
- * @return
- */
static public Object getFileKey(String s3FilePath) {
return s3FilePath.toString();
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.input.Input#getShortDescription()
- */
@Override
public String getShortDescription() {
return "input:source="
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
index 9c46c4e..7c455f6 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/GZIPReader.java
@@ -55,9 +55,6 @@ public class GZIPReader extends InputStreamReader {
/**
* validating file based on magic number
- *
- * @param fileName
- * @return
*/
public static boolean isValidFile(String fileName) {
// TODO make it generic and put in factory itself
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
index a231807..c57c028 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -30,11 +30,6 @@ public enum LogsearchReaderFactory {
private static Logger logger = Logger
.getLogger(LogsearchReaderFactory.class);
- /**
- * @param fileName
- * @return
- * @throws FileNotFoundException
- */
public Reader getReader(File file) throws FileNotFoundException {
logger.debug("Inside reader factory for file:" + file);
if (GZIPReader.isValidFile(file.getAbsolutePath())) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
index 5b9fe8f..4833d3f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
@@ -140,12 +140,10 @@ public class FetchConfigFromSolr extends Thread {
}
if (LogFeederUtil.isListContains(hosts, hostName, false)) {
if (isFilterExpired(componentFilter)) {
- // pick default
logger.debug("Filter for component " + componentName + " and host :"
+ hostName + " is expired at " + componentFilter.getExpiryTime());
return defaultLevels;
} else {
- // return tmp filter levels
return overrideLevels;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
index f177e49..b069029 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
@@ -27,13 +27,7 @@ public class LogFeederConstants {
public static final String SOLR_COMPONENT = "type";
public static final String SOLR_HOST = "host";
- //
// UserConfig Constants History
- public static final String ID = "id";
- public static final String USER_NAME = "username";
public static final String VALUES = "jsons";
- public static final String FILTER_NAME = "filtername";
public static final String ROW_TYPE = "rowtype";
-
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
index f223207..c71d4b9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
@@ -44,16 +44,13 @@ public class ApplyLogFilter extends DefaultDataFilter {
if (isNotEmpty(componentName)) {
String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
if (isNotEmpty(level)) {
- // find component filter
VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName);
if (componentFilter == null) {
- //return default value if there is no filter found for particular component
return defaultValue;
}
List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels(
hostName, componentFilter);
if (allowedLevels == null || allowedLevels.isEmpty()) {
- // if allowedlevels list is empty then allow everything
allowedLevels.add(LogFeederConstants.ALL);
}
return LogFeederUtil.isListContains(allowedLevels, level, false);
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
index a064663..e67512b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
@@ -20,14 +20,10 @@ package org.apache.ambari.logfeeder.logconfig.filter;
import java.util.Map;
-import org.apache.log4j.Logger;
-
/**
* Default filter to allow everything
*/
public class DefaultDataFilter {
- private static Logger logger = Logger.getLogger(DefaultDataFilter.class);
-
public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) {
return defaultValue;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 5b89d4b..b87ce50 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -26,7 +26,6 @@ public abstract class Mapper {
String fieldName;
String mapClassCode;
- @SuppressWarnings("hiding")
public boolean init(String inputDesc, String fieldName,
String mapClassCode, Object mapConfigs) {
this.inputDesc = inputDesc;
@@ -35,10 +34,6 @@ public abstract class Mapper {
return true;
}
- /**
- * @param value
- * @return
- */
public Object apply(Map<String, Object> jsonObj, Object value) {
return value;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 107e7e4..f293ede 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -35,7 +35,6 @@ public class MapperDate extends Mapper {
SimpleDateFormat dateFormatter = null;
boolean isEpoch = false;
- @SuppressWarnings("hiding")
@Override
public boolean init(String inputDesc, String fieldName,
String mapClassCode, Object mapConfigs) {
@@ -76,7 +75,6 @@ public class MapperDate extends Mapper {
if (value != null) {
try {
if (isEpoch) {
- // First convert to long
long ms = Long.parseLong(value.toString()) * 1000;
value = new Date(ms);
} else if (dateFormatter != null) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index 99c33ed..afbb126 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -33,7 +33,6 @@ public class MapperFieldName extends Mapper {
Logger logger = Logger.getLogger(MapperFieldName.class);
String newValue = null;
- @SuppressWarnings("hiding")
@Override
public boolean init(String inputDesc, String fieldName,
String mapClassCode, Object mapConfigs) {
@@ -56,9 +55,7 @@ public class MapperFieldName extends Mapper {
@Override
public Object apply(Map<String, Object> jsonObj, Object value) {
if (newValue != null) {
- // Remove the old one
jsonObj.remove(fieldName);
- // Add with new key name
jsonObj.put(newValue, value);
} else {
LogFeederUtil.logErrorMessageByInterval(this.getClass()
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index 9810ceb..00a69df 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -34,7 +34,6 @@ public class MapperFieldValue extends Mapper {
String prevValue = null;
String newValue = null;
- @SuppressWarnings("hiding")
@Override
public boolean init(String inputDesc, String fieldName,
String mapClassCode, Object mapConfigs) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index 99a2909..0624c59 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -20,7 +20,6 @@
package org.apache.ambari.logfeeder.output;
import java.io.File;
-import java.lang.reflect.Type;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -31,21 +30,15 @@ import org.apache.ambari.logfeeder.MetricCount;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.log4j.Logger;
-import com.google.gson.reflect.TypeToken;
-
public abstract class Output extends ConfigBlock {
static private Logger logger = Logger.getLogger(Output.class);
String destination = null;
- Type jsonType = new TypeToken<Map<String, String>>() {
- }.getType();
-
public MetricCount writeBytesMetric = new MetricCount();
@Override
public String getShortDescription() {
- // TODO Auto-generated method stub
return null;
}
@@ -63,11 +56,6 @@ public abstract class Output extends ConfigBlock {
public abstract void copyFile(File inputFile, InputMarker inputMarker)
throws UnsupportedOperationException;
- /**
- * @param jsonObj
- * @param input
- * @throws Exception
- */
public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
throws Exception {
write(LogFeederUtil.getGson().toJson(jsonObj), inputMarker);
@@ -85,8 +73,6 @@ public abstract class Output extends ConfigBlock {
/**
* This is called on shutdown. All output should extend it.
- *
- * @return
*/
public boolean isClosed() {
return isClosed;
@@ -114,9 +100,7 @@ public abstract class Output extends ConfigBlock {
public synchronized void logStat() {
super.logStat();
- //Printing stat for writeBytesMetric
logStatForMetric(writeBytesMetric, "Stat: Bytes Written");
-
}
public void trimStrValue(Map<String, Object> jsonObj) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
index 8df1d29..4a408f9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -30,10 +30,6 @@ public class OutputData {
Map<String, Object> jsonObj;
InputMarker inputMarker;
- /**
- * @param jsonObj
- * @param inputMarker
- */
public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) {
super();
this.jsonObj = jsonObj;
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index 7cfcb98..2d41a0b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -25,7 +25,6 @@ import org.apache.log4j.Logger;
/**
* Output that just ignore the logs
- *
*/
public class OutputDevNull extends Output {
@@ -33,7 +32,6 @@ public class OutputDevNull extends Output {
@Override
public void write(String block, InputMarker inputMarker){
- // just ignore the logs
logger.trace("Ignore log block: " + block);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 4327f6f..b4d2bbb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -26,7 +26,6 @@ import java.io.PrintWriter;
import java.util.Map;
import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
@@ -96,7 +95,6 @@ public class OutputFile extends Output {
throws Exception {
String outStr = null;
if (codec.equals("csv")) {
- // Convert to CSV
CSVPrinter csvPrinter = new CSVPrinter(outWriter, CSVFormat.RFC4180);
//TODO:
} else {
@@ -110,11 +108,6 @@ public class OutputFile extends Output {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.output.Output#write()
- */
@Override
synchronized public void write(String block, InputMarker inputMarker) throws Exception {
if (outWriter != null && block != null) {
@@ -125,11 +118,6 @@ public class OutputFile extends Output {
}
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
- */
@Override
public String getShortDescription() {
return "output:destination=file,path=" + filePath;
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index 120d071..a7f2321 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -83,18 +83,14 @@ public class OutputKafka extends Output {
int lingerMS = getIntValue("linger_ms", DEFAULT_LINGER_MS);
Properties props = new Properties();
- // 0.9.0
props.put("bootstrap.servers", brokerList);
props.put("client.id", "logfeeder_producer");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("compression.type", "snappy");
- // props.put("retries", "3");
props.put("batch.size", batchSize);
props.put("linger.ms", lingerMS);
- // props.put("metadata.broker.list", brokerList);
- // Get all kafka custom properties
for (String key : configs.keySet()) {
if (key.startsWith("kafka.")) {
Object value = configs.get(key);
@@ -126,11 +122,8 @@ public class OutputKafka extends Output {
kafkaCallBack = failedMessages.take();
}
if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
- // logger.info("Sent message. count=" +
- // kafkaCallBack.thisMessageNumber);
kafkaCallBack = null;
} else {
- // Should wait for sometime
LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for "
+ FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
@@ -183,9 +176,6 @@ public class OutputKafka extends Output {
super.setDrain(drain);
}
- /**
- * Flush document buffer
- */
public void flush() {
LOG.info("Flush called...");
setDrain(true);
@@ -271,9 +261,6 @@ public class OutputKafka extends Output {
}
output.incrementStat(1);
output.writeBytesMetric.count += message.length();
-
- // metadata.partition();
- // metadata.offset();
} else {
output.isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
@@ -289,6 +276,6 @@ public class OutputKafka extends Output {
public void copyFile(File inputFile, InputMarker inputMarker)
throws UnsupportedOperationException {
throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=kafka");
+ "copyFile method is not yet supported for output=kafka");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index 4cdf82d..f42195c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -38,7 +38,6 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
- *
* Write log file into s3 bucket
*/
public class OutputS3File extends Output {
@@ -93,14 +92,8 @@ public class OutputS3File extends Output {
s3Key);
// write global config
writeGlobalConfig();
-
}
- /**
- *
- * @param filters
- * @param filter
- */
public void addFilters(ArrayList<Map<String, Object>> filters, Filter filter) {
if (filter != null) {
Map<String, Object> filterConfig = new HashMap<String, Object>();
@@ -112,13 +105,6 @@ public class OutputS3File extends Output {
}
}
- /**
- *
- * @param filters
- * @param inputConfig
- * @param bucketName
- * @param componentName
- */
public void writeConfigToS3(Map<String, Object> config, String bucketName,
String accessKey, String secretKey, HashMap<String, String> contextParam,
String s3Key) {
@@ -126,17 +112,12 @@ public class OutputS3File extends Output {
s3ConfigDir = PlaceholderUtil.replaceVariables(s3ConfigDir, contextParam);
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String configJson = gson.toJson(config);
- // write json to s3 file
+
s3Key = s3ConfigDir + S3Util.INSTANCE.S3_PATH_SEPARATOR + s3Key;
S3Util.INSTANCE.writeIntoS3File(configJson, bucketName, s3Key, accessKey,
secretKey);
}
- /**
- *
- * @param componentName
- * @return String
- */
public String getComponentConfigFileName(String componentName) {
String fileName = "input.config-" + componentName + ".json";
return fileName;
@@ -222,6 +203,6 @@ public class OutputS3File extends Output {
@Override
public void write(String block, InputMarker inputMarker) throws Exception {
throw new UnsupportedOperationException(
- "write method is not yet supported for output=s3_file");
+ "write method is not yet supported for output=s3_file");
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 0480fbd..43d908e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -249,9 +249,6 @@ public class OutputSolr extends Output {
}
}
- /**
- * Flush document buffer
- */
public void flush() {
LOG.info("Flush called...");
setDrain(true);
@@ -378,8 +375,6 @@ public class OutputSolr extends Output {
* successfully able to write to the collection or shard. It will block till
* it can write. The outgoingBuffer is a BlockingQueue and when it is full, it
* will automatically stop parsing the log files.
- * @param outputData
- * @return
*/
private boolean sendToSolr(OutputData outputData) {
boolean result = false;
@@ -503,14 +498,12 @@ public class OutputSolr extends Output {
@Override
public void write(String block, InputMarker inputMarker) throws Exception {
- // TODO Auto-generated method stub
-
}
@Override
public void copyFile(File inputFile, InputMarker inputMarker)
throws UnsupportedOperationException {
throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=solr");
+ "copyFile method is not yet supported for output=solr");
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
index 050b69b..d0fbb6c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/AWSUtil.java
@@ -29,13 +29,6 @@ public enum AWSUtil {
INSTANCE;
private static final Logger LOG = Logger.getLogger(AWSUtil.class);
- /**
- * Get aws username
- *
- * @param accessKey
- * @param secretKey
- * @return String
- */
public String getAwsUserName(String accessKey, String secretKey) {
String username = null;
AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
@@ -85,7 +78,6 @@ public enum AWSUtil {
secretKey);
return awsCredentials;
} else {
- // retrun null
return null;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
index f49837c..ced2b5c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/s3/S3Util.java
@@ -31,24 +31,18 @@ import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
-import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
-import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.Upload;
-import com.amazonaws.services.s3.transfer.model.UploadResult;
/**
* Utility to connect to s3
- *
*/
public enum S3Util {
INSTANCE;
@@ -58,11 +52,6 @@ public enum S3Util {
public final String S3_PATH_START_WITH = "s3://";
public final String S3_PATH_SEPARATOR = "/";
- /**
- * get s3 client
- *
- * @return AmazonS3
- */
public AmazonS3 getS3Client(String accessKey, String secretKey) {
AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
accessKey, secretKey);
@@ -75,10 +64,6 @@ public enum S3Util {
return s3client;
}
- /**
- *
- * @return TransferManager
- */
public TransferManager getTransferManager(String accessKey, String secretKey) {
AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
accessKey, secretKey);
@@ -91,21 +76,12 @@ public enum S3Util {
return transferManager;
}
- /**
- * shutdown s3 transfer manager
- */
public void shutdownTransferManager(TransferManager transferManager) {
if (transferManager != null) {
transferManager.shutdownNow();
}
}
- /**
- * Extract bucket name from s3 file complete path
- *
- * @param s3Path
- * @return String
- */
public String getBucketName(String s3Path) {
String bucketName = null;
// s3path
@@ -117,12 +93,6 @@ public enum S3Util {
return bucketName;
}
- /**
- * get s3 key from s3Path after removing bucketname
- *
- * @param s3Path
- * @return String
- */
public String getS3Key(String s3Path) {
StringBuilder s3Key = new StringBuilder();
// s3path
@@ -142,18 +112,12 @@ public enum S3Util {
return s3Key.toString();
}
- /**
- *
- * @param bucketName
- * @param s3Key
- * @param localFile
- */
public void uploadFileTos3(String bucketName, String s3Key, File localFile,
String accessKey, String secretKey) {
TransferManager transferManager = getTransferManager(accessKey, secretKey);
try {
Upload upload = transferManager.upload(bucketName, s3Key, localFile);
- UploadResult uploadResult = upload.waitForUploadResult();
+ upload.waitForUploadResult();
} catch (AmazonClientException | InterruptedException e) {
LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
e);
@@ -164,10 +128,6 @@ public enum S3Util {
/**
* Get the buffer reader to read s3 file as a stream
- *
- * @param s3Path
- * @return BufferedReader
- * @throws IOException
*/
public BufferedReader getReader(String s3Path, String accessKey,
String secretKey) throws IOException {
@@ -192,12 +152,6 @@ public enum S3Util {
}
}
- /**
- *
- * @param data
- * @param bucketName
- * @param s3Key
- */
public void writeIntoS3File(String data, String bucketName, String s3Key,
String accessKey, String secretKey) {
InputStream in = null;
@@ -210,10 +164,9 @@ public enum S3Util {
TransferManager transferManager = getTransferManager(accessKey, secretKey);
try {
if (transferManager != null) {
- UploadResult uploadResult = transferManager
- .upload(
+ transferManager.upload(
new PutObjectRequest(bucketName, s3Key, in,
- new ObjectMetadata())).waitForUploadResult();
+ new ObjectMetadata())).waitForUploadResult();
LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
+ bucketName);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
index 54008ec..c2addbd 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
@@ -32,13 +32,6 @@ public class CompressionUtil {
private static final Logger LOG = Logger.getLogger(CompressionUtil.class);
- /**
- * Compress file
- *
- * @param inputFile
- * @param outputFile
- * @param algoName
- */
public static File compressFile(File inputFile, File outputFile, String algoName) {
CompressorOutputStream cos = null;
FileInputStream ios = null;
@@ -68,7 +61,6 @@ public class CompressionUtil {
} catch (Exception e) {
LOG.error(e);
} finally {
- // Close the stream
if (cos != null) {
try {
cos.close();
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
index 9be85ee..d6c3117 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
@@ -21,24 +21,13 @@ import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.log4j.Logger;
-
public class PlaceholderUtil {
- private static Logger LOG = Logger.getLogger(PlaceholderUtil.class);
-
private static Pattern placeHolderPattern;
static {
-// placeHolderPattern = Pattern.compile("\\{(.*?)\\}");
placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
}
- /**
- *
- * @param inputStr
- * @param contextParam
- * @return String
- */
public static String replaceVariables(String inputStr,
HashMap<String, String> contextParam) {
Matcher m = placeHolderPattern.matcher(inputStr);
@@ -57,13 +46,6 @@ public class PlaceholderUtil {
return output;
}
- /**
- *
- * @param contextParam
- * @param defaultValue
- * @param key
- * @return String
- */
private static String getFromContext(HashMap<String, String> contextParam,
String defaultValue, String key) {
String returnValue = defaultValue;// by default set default value as a
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
index 2257e32..19dd404 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
@@ -37,7 +37,6 @@ import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
-import org.apache.solr.common.SolrInputDocument;
public class SolrUtil {
@@ -47,7 +46,6 @@ public class SolrUtil {
SolrClient solrClient = null;
CloudSolrClient solrClouldClient = null;
- boolean isSolrCloud = true;
String solrDetail = "";
String collectionName = null;
@@ -112,10 +110,6 @@ public class SolrUtil {
return solrClient;
}
- /**
- * @param waitDurationMS
- * @return
- */
public boolean checkSolrStatus(int waitDurationMS) {
boolean status = false;
try {
@@ -201,15 +195,4 @@ public class SolrUtil {
}
return configMap;
}
-
- /**
- * @param solrInputDocument
- * @throws SolrServerException
- * @throws IOException
- */
- public void addDoc(SolrInputDocument solrInputDocument) throws SolrServerException, IOException {
- solrClient.add(solrInputDocument);
- solrClient.commit();
- }
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
index 1e2be37..4f0d1aa 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java
@@ -19,12 +19,9 @@
package org.apache.ambari.logfeeder.s3;
public class AWSUtilTest {
-// @Test
public void testAWSUtil_getAwsUserName() throws Exception {
String S3_ACCESS_KEY = "S3_ACCESS_KEY";
String S3_SECRET_KEY = "S3_SECRET_KEY";
- String expectedUsername = "";
- String username = AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY,
- S3_SECRET_KEY);
+ AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY);
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/4d948256/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
index d07ae2b..af14140 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java
@@ -18,14 +18,9 @@
*/
package org.apache.ambari.logfeeder.s3;
-import org.apache.log4j.Logger;
-import org.junit.Test;
import static org.junit.Assert.assertEquals;
public class S3UtilTest {
- private static final Logger LOG = Logger.getLogger(S3UtilTest.class);
-
- // @Test
public void testS3Util_pathToBucketName() throws Exception {
String s3Path = "s3://bucket_name/path/file.txt";
String expectedBucketName = "bucket_name";
@@ -33,7 +28,6 @@ public class S3UtilTest {
assertEquals(expectedBucketName, actualBucketName);
}
- // @Test
public void testS3Util_pathToS3Key() throws Exception {
String s3Path = "s3://bucket_name/path/file.txt";
String expectedS3key = "path/file.txt";