You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:56 UTC
[29/52] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder
(Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
index ec26a88..ffd6cec 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/FileUtil.java
@@ -20,25 +20,73 @@
package org.apache.ambari.logfeeder.util;
import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
public class FileUtil {
- private static final Logger logger = Logger.getLogger(FileUtil.class);
-
- public static List<File> getAllFileFromDir(File directory,
- String[] searchFileWithExtensions, boolean checkInSubDir) {
+ private static final Logger LOG = Logger.getLogger(FileUtil.class);
+
+ private FileUtil() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static List<File> getAllFileFromDir(File directory, String extension, boolean checkInSubDir) {
if (!directory.exists()) {
- logger.error(directory.getAbsolutePath() + " is not exists ");
- } else if (directory.isDirectory()) {
- return (List<File>) FileUtils.listFiles(directory,
- searchFileWithExtensions, checkInSubDir);
+ LOG.error(directory.getAbsolutePath() + " is not exists ");
+ } else if (!directory.isDirectory()) {
+ LOG.error(directory.getAbsolutePath() + " is not Directory ");
} else {
- logger.error(directory.getAbsolutePath() + " is not Directory ");
+ return (List<File>) FileUtils.listFiles(directory, new String[]{extension}, checkInSubDir);
}
return new ArrayList<File>();
}
+
+
+ public static Object getFileKey(File file) {
+ try {
+ Path fileFullPath = Paths.get(file.getAbsolutePath());
+ if (fileFullPath != null) {
+ BasicFileAttributes basicAttr = Files.readAttributes(fileFullPath, BasicFileAttributes.class);
+ return basicAttr.fileKey();
+ }
+ } catch (Throwable ex) {
+ LOG.error("Error getting file attributes for file=" + file, ex);
+ }
+ return file.toString();
+ }
+
+ public static File getFileFromClasspath(String filename) {
+ URL fileCompleteUrl = Thread.currentThread().getContextClassLoader().getResource(filename);
+ LOG.debug("File Complete URI :" + fileCompleteUrl);
+ File file = null;
+ try {
+ file = new File(fileCompleteUrl.toURI());
+ } catch (Exception exception) {
+ LOG.debug(exception.getMessage(), exception.getCause());
+ }
+ return file;
+ }
+
+ public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
+ ObjectMapper mapper = new ObjectMapper();
+ try {
+ HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {});
+ return jsonmap;
+ } catch (IOException e) {
+ LOG.error(e, e.getCause());
+ }
+ return new HashMap<String, Object>();
+ }
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
index 32029ff..5bf600e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java
@@ -22,36 +22,23 @@ package org.apache.ambari.logfeeder.util;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.IOException;
import java.lang.reflect.Type;
import java.net.InetAddress;
-import java.net.URL;
import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.TimeZone;
import org.apache.ambari.logfeeder.LogFeeder;
-import org.apache.ambari.logfeeder.filter.Filter;
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.mapper.Mapper;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.output.Output;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import org.codehaus.jackson.JsonParseException;
-import org.codehaus.jackson.map.JsonMappingException;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.type.TypeReference;
-import com.google.common.collect.ObjectArrays;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -60,109 +47,80 @@ import com.google.gson.reflect.TypeToken;
* This class contains utility methods used by LogFeeder
*/
public class LogFeederUtil {
- private static final Logger logger = Logger.getLogger(LogFeederUtil.class);
+ private static final Logger LOG = Logger.getLogger(LogFeederUtil.class);
- private static final int HASH_SEED = 31174077;
- public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
- public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
- private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
-
- private static Properties props;
-
- private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
- private static int logInterval = 30000; // 30 seconds
+ private final static String GSON_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
+ private static Gson gson = new GsonBuilder().setDateFormat(GSON_DATE_FORMAT).create();
+
+ public static Gson getGson() {
+ return gson;
+ }
public static String hostName = null;
public static String ipAddress = null;
- private static String logfeederTempDir = null;
-
- private static final Object _LOCK = new Object();
-
static{
- setHostNameAndIP();
+ try {
+ InetAddress ip = InetAddress.getLocalHost();
+ ipAddress = ip.getHostAddress();
+ String getHostName = ip.getHostName();
+ String getCanonicalHostName = ip.getCanonicalHostName();
+ if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
+ LOG.info("Using getCanonicalHostName()=" + getCanonicalHostName);
+ hostName = getCanonicalHostName;
+ } else {
+ LOG.info("Using getHostName()=" + getHostName);
+ hostName = getHostName;
+ }
+ LOG.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + ", getCanonicalHostName=" + getCanonicalHostName +
+ ", hostName=" + hostName);
+ } catch (UnknownHostException e) {
+ LOG.error("Error getting hostname.", e);
+ }
}
- public static Gson getGson() {
- return gson;
- }
-
- private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
- @Override
- protected SimpleDateFormat initialValue() {
- SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
- sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
- return sdf;
- }
- };
+ private static Properties props;
/**
- * This method will read the properties from System, followed by propFile
- * and finally from the map
+ * This method will read the properties from System, followed by propFile and finally from the map
*/
- public static void loadProperties(String propFile, String[] propNVList)
- throws Exception {
- logger.info("Loading properties. propFile=" + propFile);
+ public static void loadProperties(String propFile, String[] propNVList) throws Exception {
+ LOG.info("Loading properties. propFile=" + propFile);
props = new Properties(System.getProperties());
boolean propLoaded = false;
// First get properties file path from environment value
String propertiesFilePath = System.getProperty("properties");
- if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) {
+ if (StringUtils.isNotEmpty(propertiesFilePath)) {
File propertiesFile = new File(propertiesFilePath);
if (propertiesFile.exists() && propertiesFile.isFile()) {
- logger.info("Properties file path set in environment. Loading properties file="
- + propertiesFilePath);
- FileInputStream fileInputStream = null;
- try {
- fileInputStream = new FileInputStream(propertiesFile);
- props.load(fileInputStream);
+ LOG.info("Properties file path set in environment. Loading properties file=" + propertiesFilePath);
+ try (FileInputStream fis = new FileInputStream(propertiesFile)) {
+ props.load(fis);
propLoaded = true;
} catch (Throwable t) {
- logger.error("Error loading properties file. properties file="
- + propertiesFile.getAbsolutePath());
- } finally {
- if (fileInputStream != null) {
- try {
- fileInputStream.close();
- } catch (Throwable t) {
- // Ignore error
- }
- }
+ LOG.error("Error loading properties file. properties file=" + propertiesFile.getAbsolutePath());
}
} else {
- logger.error("Properties file path set in environment, but file not found. properties file="
- + propertiesFilePath);
+ LOG.error("Properties file path set in environment, but file not found. properties file=" + propertiesFilePath);
}
}
if (!propLoaded) {
- BufferedInputStream fileInputStream = null;
- try {
+ try (BufferedInputStream bis = (BufferedInputStream) LogFeeder.class.getClassLoader().getResourceAsStream(propFile)) {
// Properties not yet loaded, let's try from class loader
- fileInputStream = (BufferedInputStream) LogFeeder.class
- .getClassLoader().getResourceAsStream(propFile);
- if (fileInputStream != null) {
- logger.info("Loading properties file " + propFile
- + " from classpath");
- props.load(fileInputStream);
+ if (bis != null) {
+ LOG.info("Loading properties file " + propFile + " from classpath");
+ props.load(bis);
propLoaded = true;
} else {
- logger.fatal("Properties file not found in classpath. properties file name= "
- + propFile);
- }
- } finally {
- if (fileInputStream != null) {
- try {
- fileInputStream.close();
- } catch (IOException e) {
- }
+ LOG.fatal("Properties file not found in classpath. properties file name= " + propFile);
}
}
}
if (!propLoaded) {
- logger.fatal("Properties file is not loaded.");
+ LOG.fatal("Properties file is not loaded.");
throw new Exception("Properties not loaded");
} else {
updatePropertiesFromMap(propNVList);
@@ -173,162 +131,124 @@ public class LogFeederUtil {
if (nvList == null) {
return;
}
- logger.info("Trying to load additional proeprties from argument paramters. nvList.length="
- + nvList.length);
- if (nvList != null && nvList.length > 0) {
- for (String nv : nvList) {
- logger.info("Passed nv=" + nv);
- if (nv.startsWith("-") && nv.length() > 1) {
- nv = nv.substring(1);
- logger.info("Stripped nv=" + nv);
- int i = nv.indexOf("=");
- if (nv.length() > i) {
- logger.info("Candidate nv=" + nv);
- String name = nv.substring(0, i);
- String value = nv.substring(i + 1);
- logger.info("Adding property from argument to properties. name="
- + name + ", value=" + value);
- props.put(name, value);
- }
+ LOG.info("Trying to load additional proeprties from argument paramters. nvList.length=" + nvList.length);
+ for (String nv : nvList) {
+ LOG.info("Passed nv=" + nv);
+ if (nv.startsWith("-") && nv.length() > 1) {
+ nv = nv.substring(1);
+ LOG.info("Stripped nv=" + nv);
+ int i = nv.indexOf("=");
+ if (nv.length() > i) {
+ LOG.info("Candidate nv=" + nv);
+ String name = nv.substring(0, i);
+ String value = nv.substring(i + 1);
+ LOG.info("Adding property from argument to properties. name=" + name + ", value=" + value);
+ props.put(name, value);
}
}
}
}
- static public String getStringProperty(String key) {
- if (props != null) {
- return props.getProperty(key);
- }
- return null;
+ public static String getStringProperty(String key) {
+ return props == null ? null : props.getProperty(key);
}
- static public String getStringProperty(String key, String defaultValue) {
- if (props != null) {
- return props.getProperty(key, defaultValue);
- }
- return defaultValue;
+ public static String getStringProperty(String key, String defaultValue) {
+ return props == null ? defaultValue : props.getProperty(key, defaultValue);
}
- static public boolean getBooleanProperty(String key, boolean defaultValue) {
- String strValue = getStringProperty(key);
- return toBoolean(strValue, defaultValue);
+ public static boolean getBooleanProperty(String key, boolean defaultValue) {
+ String value = getStringProperty(key);
+ return toBoolean(value, defaultValue);
}
- private static boolean toBoolean(String strValue, boolean defaultValue) {
- boolean retValue = defaultValue;
- if (!StringUtils.isEmpty(strValue)) {
- if (strValue.equalsIgnoreCase("true")
- || strValue.equalsIgnoreCase("yes")) {
- retValue = true;
- } else {
- retValue = false;
- }
+ private static boolean toBoolean(String value, boolean defaultValue) {
+ if (StringUtils.isEmpty(value)) {
+ return defaultValue;
}
- return retValue;
+
+ return "true".equalsIgnoreCase(value) || "yes".equalsIgnoreCase(value);
}
- static public int getIntProperty(String key, int defaultValue) {
- String strValue = getStringProperty(key);
- int retValue = defaultValue;
- retValue = objectToInt(strValue, retValue, ", key=" + key);
+ public static int getIntProperty(String key, int defaultValue) {
+ String value = getStringProperty(key);
+ int retValue = objectToInt(value, defaultValue, ", key=" + key);
return retValue;
}
- public static int objectToInt(Object objValue, int retValue,
- String errMessage) {
+ public static int objectToInt(Object objValue, int retValue, String errMessage) {
if (objValue == null) {
return retValue;
}
String strValue = objValue.toString();
- if (!StringUtils.isEmpty(strValue)) {
+ if (StringUtils.isNotEmpty(strValue)) {
try {
retValue = Integer.parseInt(strValue);
} catch (Throwable t) {
- logger.error("Error parsing integer value. str=" + strValue
- + ", " + errMessage);
+ LOG.error("Error parsing integer value. str=" + strValue + ", " + errMessage);
}
}
return retValue;
}
- public static boolean isEnabled(Map<String, Object> conditionConfigs,
- Map<String, Object> valueConfigs) {
- boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true);
- @SuppressWarnings("unchecked")
- Map<String, Object> conditions = (Map<String, Object>) conditionConfigs
- .get("conditions");
- if (conditions != null && conditions.size() > 0) {
- allow = false;
- for (String conditionType : conditions.keySet()) {
- if (conditionType.equalsIgnoreCase("fields")) {
- @SuppressWarnings("unchecked")
- Map<String, Object> fields = (Map<String, Object>) conditions
- .get("fields");
- for (String fieldName : fields.keySet()) {
- Object values = fields.get(fieldName);
- if (values instanceof String) {
- allow = isFieldConditionMatch(valueConfigs,
- fieldName, (String) values);
- } else {
- @SuppressWarnings("unchecked")
- List<String> listValues = (List<String>) values;
- for (String stringValue : listValues) {
- allow = isFieldConditionMatch(valueConfigs,
- fieldName, stringValue);
- if (allow) {
- break;
- }
- }
- }
- if (allow) {
- break;
+ @SuppressWarnings("unchecked")
+ public static boolean isEnabled(Map<String, Object> conditionConfigs, Map<String, Object> valueConfigs) {
+ Map<String, Object> conditions = (Map<String, Object>) conditionConfigs.get("conditions");
+ if (MapUtils.isEmpty(conditions)) {
+ return toBoolean((String) valueConfigs.get("is_enabled"), true);
+ }
+
+ for (String conditionType : conditions.keySet()) {
+ if (!conditionType.equalsIgnoreCase("fields")) {
+ continue;
+ }
+
+ Map<String, Object> fields = (Map<String, Object>) conditions.get("fields");
+ for (Map.Entry<String, Object> field : fields.entrySet()) {
+ if (field.getValue() instanceof String) {
+ if (isFieldConditionMatch(valueConfigs, field.getKey(), (String) field.getValue())) {
+ return true;
+ }
+ } else {
+ for (String stringValue : (List<String>) field.getValue()) {
+ if (isFieldConditionMatch(valueConfigs, field.getKey(), stringValue)) {
+ return true;
}
}
}
- if (allow) {
- break;
- }
}
}
- return allow;
+
+ return false;
}
- public static boolean isFieldConditionMatch(Map<String, Object> configs,
- String fieldName, String stringValue) {
+ private static boolean isFieldConditionMatch(Map<String, Object> configs, String fieldName, String stringValue) {
boolean allow = false;
String fieldValue = (String) configs.get(fieldName);
if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) {
allow = true;
} else {
@SuppressWarnings("unchecked")
- Map<String, Object> addFields = (Map<String, Object>) configs
- .get("add_fields");
+ Map<String, Object> addFields = (Map<String, Object>) configs.get("add_fields");
if (addFields != null && addFields.get(fieldName) != null) {
String addFieldValue = (String) addFields.get(fieldName);
if (stringValue.equalsIgnoreCase(addFieldValue)) {
allow = true;
}
}
-
}
return allow;
}
- public static void logStatForMetric(MetricCount metric, String prefixStr,
- String postFix) {
- long currStat = metric.count;
+ public static void logStatForMetric(MetricData metric, String prefixStr, String postFix) {
+ long currStat = metric.value;
long currMS = System.currentTimeMillis();
- if (currStat > metric.prevLogCount) {
- if (postFix == null) {
- postFix = "";
- }
- logger.info(prefixStr + ": total_count=" + metric.count
- + ", duration=" + (currMS - metric.prevLogMS) / 1000
- + " secs, count=" + (currStat - metric.prevLogCount)
- + postFix);
+ if (currStat > metric.prevLogValue) {
+ LOG.info(prefixStr + ": total_count=" + metric.value + ", duration=" + (currMS - metric.prevLogTime) / 1000 +
+ " secs, count=" + (currStat - metric.prevLogValue) + postFix);
}
- metric.prevLogCount = currStat;
- metric.prevLogMS = currMS;
+ metric.prevLogValue = currStat;
+ metric.prevLogTime = currMS;
}
public static Map<String, Object> cloneObject(Map<String, Object> map) {
@@ -336,221 +256,74 @@ public class LogFeederUtil {
return null;
}
String jsonStr = gson.toJson(map);
- Type type = new TypeToken<Map<String, Object>>() {
- }.getType();
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
return gson.fromJson(jsonStr, type);
}
public static Map<String, Object> toJSONObject(String jsonStr) {
- if(jsonStr==null || jsonStr.trim().isEmpty()){
+ if (StringUtils.isBlank(jsonStr)) {
return new HashMap<String, Object>();
}
- Type type = new TypeToken<Map<String, Object>>() {
- }.getType();
+ Type type = new TypeToken<Map<String, Object>>() {}.getType();
return gson.fromJson(jsonStr, type);
}
- static public boolean logErrorMessageByInterval(String key, String message,
- Throwable e, Logger callerLogger, Level level) {
+ private static class LogHistory {
+ private long lastLogTime = 0;
+ private int counter = 0;
+ }
+
+ private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>();
+ public static boolean logErrorMessageByInterval(String key, String message, Throwable e, Logger callerLogger, Level level) {
LogHistory log = logHistoryList.get(key);
if (log == null) {
log = new LogHistory();
logHistoryList.put(key, log);
}
- if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) {
+
+ if ((System.currentTimeMillis() - log.lastLogTime) > 30 * 1000) {
log.lastLogTime = System.currentTimeMillis();
- int counter = log.counter;
- log.counter = 0;
- if (counter > 0) {
- message += ". Messages suppressed before: " + counter;
- }
- if (e == null) {
- callerLogger.log(level, message);
- } else {
- callerLogger.log(level, message, e);
+ if (log.counter > 0) {
+ message += ". Messages suppressed before: " + log.counter;
}
+ log.counter = 0;
+ callerLogger.log(level, message, e);
return true;
} else {
log.counter++;
- }
- return false;
-
- }
-
- static public String subString(String str, int maxLength) {
- if (str == null || str.length() == 0) {
- return "";
- }
- maxLength = str.length() < maxLength ? str.length() : maxLength;
- return str.substring(0, maxLength);
- }
-
- public static long genHash(String value) {
- if (value == null) {
- value = "null";
- }
- return MurmurHash.hash64A(value.getBytes(), HASH_SEED);
- }
-
- private static class LogHistory {
- private long lastLogTime = 0;
- private int counter = 0;
- }
-
- public static String getDate(String timeStampStr) {
- try {
- return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
- } catch (Exception ex) {
- logger.error(ex);
- return null;
+ return false;
}
}
- public static String getActualDateStr() {
- try {
- return dateFormatter.get().format(new Date());
- } catch (Exception ex) {
- logger.error(ex);
- return null;
- }
- }
-
- public static File getFileFromClasspath(String filename) {
- URL fileCompleteUrl = Thread.currentThread().getContextClassLoader()
- .getResource(filename);
- logger.debug("File Complete URI :" + fileCompleteUrl);
- File file = null;
- try {
- file = new File(fileCompleteUrl.toURI());
- } catch (Exception exception) {
- logger.debug(exception.getMessage(), exception.getCause());
- }
- return file;
- }
-
- public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) {
- Object instance = null;
- try {
- instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
- } catch (Exception exception) {
- logger.error("Unsupported class =" + classFullName, exception.getCause());
+ public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
+ if (list == null) {
+ return false;
}
- // check instance class as par aliasType
- if (instance != null) {
- boolean isValid = false;
- switch (aliasType) {
- case FILTER:
- isValid = Filter.class.isAssignableFrom(instance.getClass());
- break;
- case INPUT:
- isValid = Input.class.isAssignableFrom(instance.getClass());
- break;
- case OUTPUT:
- isValid = Output.class.isAssignableFrom(instance.getClass());
- break;
- case MAPPER:
- isValid = Mapper.class.isAssignableFrom(instance.getClass());
- break;
- default:
- // by default consider all are valid class
- isValid = true;
+
+ for (String value : list) {
+ if (value == null) {
+ continue;
}
- if (!isValid) {
- logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
- }
- }
- return instance;
- }
-
- public static HashMap<String, Object> readJsonFromFile(File jsonFile) {
- ObjectMapper mapper = new ObjectMapper();
- try {
- HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() {
- });
- return jsonmap;
- } catch (JsonParseException e) {
- logger.error(e, e.getCause());
- } catch (JsonMappingException e) {
- logger.error(e, e.getCause());
- } catch (IOException e) {
- logger.error(e, e.getCause());
- }
- return new HashMap<String, Object>();
- }
-
- public static boolean isListContains(List<String> list, String str, boolean caseSensitive) {
- if (list != null) {
- for (String value : list) {
- if (value != null) {
- if (caseSensitive) {
- if (value.equals(str)) {
- return true;
- }
- } else {
- if (value.equalsIgnoreCase(str)) {
- return true;
- }
- }
- if (value.equalsIgnoreCase(LogFeederConstants.ALL)) {
- return true;
- }
- }
+
+ if (caseSensitive ? value.equals(str) : value.equalsIgnoreCase(str) ||
+ value.equalsIgnoreCase(LogFeederConstants.ALL)) {
+ return true;
}
}
return false;
}
+ private static String logfeederTempDir = null;
- private static synchronized String setHostNameAndIP() {
- if (hostName == null || ipAddress == null) {
- try {
- InetAddress ip = InetAddress.getLocalHost();
- ipAddress = ip.getHostAddress();
- String getHostName = ip.getHostName();
- String getCanonicalHostName = ip.getCanonicalHostName();
- if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) {
- logger.info("Using getCanonicalHostName()=" + getCanonicalHostName);
- hostName = getCanonicalHostName;
- } else {
- logger.info("Using getHostName()=" + getHostName);
- hostName = getHostName;
- }
- logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName
- + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName="
- + hostName);
- } catch (UnknownHostException e) {
- logger.error("Error getting hostname.", e);
- }
- }
- return hostName;
- }
-
- public static String[] mergeArray(String[] first, String[] second) {
- if (first == null) {
- first = new String[0];
- }
- if (second == null) {
- second = new String[0];
- }
- String[] mergedArray = ObjectArrays.concat(first, second, String.class);
- return mergedArray;
- }
-
- public static String getLogfeederTempDir() {
+ public synchronized static String getLogfeederTempDir() {
if (logfeederTempDir == null) {
- synchronized (_LOCK) {
- if (logfeederTempDir == null) {
- String tempDirValue = getStringProperty("logfeeder.tmp.dir",
- "/tmp/$username/logfeeder/");
- HashMap<String, String> contextParam = new HashMap<String, String>();
- String username = System.getProperty("user.name");
- contextParam.put("username", username);
- logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue,
- contextParam);
- }
- }
+ String tempDirValue = getStringProperty("logfeeder.tmp.dir", "/tmp/$username/logfeeder/");
+ HashMap<String, String> contextParam = new HashMap<String, String>();
+ String username = System.getProperty("user.name");
+ contextParam.put("username", username);
+ logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, contextParam);
}
return logfeederTempDir;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
index fd96f8a..c975b99 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogfeederHDFSUtil.java
@@ -25,71 +25,53 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
-public enum LogfeederHDFSUtil {
- INSTANCE;
- private static Logger logger = Logger.getLogger(LogfeederHDFSUtil.class);
+public class LogfeederHDFSUtil {
+ private static final Logger LOG = Logger.getLogger(LogfeederHDFSUtil.class);
- public void createHDFSDir(String dirPath, FileSystem dfs) {
- Path src = new Path(dirPath);
- try {
- if (dfs.isDirectory(src)) {
- logger.info("hdfs dir dirPath=" + dirPath + " is already exist.");
- return;
- }
- boolean isDirCreated = dfs.mkdirs(src);
- if (isDirCreated) {
- logger.debug("HDFS dirPath=" + dirPath + " created successfully.");
- } else {
- logger.warn("HDFS dir creation failed dirPath=" + dirPath);
- }
- } catch (IOException e) {
- logger.error("HDFS dir creation failed dirPath=" + dirPath, e.getCause());
- }
+ private LogfeederHDFSUtil() {
+ throw new UnsupportedOperationException();
}
-
- public boolean copyFromLocal(String sourceFilepath, String destFilePath,
- FileSystem fileSystem, boolean overwrite, boolean delSrc) {
+
+ public static boolean copyFromLocal(String sourceFilepath, String destFilePath, FileSystem fileSystem, boolean overwrite,
+ boolean delSrc) {
Path src = new Path(sourceFilepath);
Path dst = new Path(destFilePath);
boolean isCopied = false;
try {
- logger.info("copying localfile := " + sourceFilepath + " to hdfsPath := "
- + destFilePath);
+ LOG.info("copying localfile := " + sourceFilepath + " to hdfsPath := " + destFilePath);
fileSystem.copyFromLocalFile(delSrc, overwrite, src, dst);
isCopied = true;
} catch (Exception e) {
- logger.error("Error copying local file :" + sourceFilepath
- + " to hdfs location : " + destFilePath, e);
+ LOG.error("Error copying local file :" + sourceFilepath + " to hdfs location : " + destFilePath, e);
}
return isCopied;
}
- public FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
+ public static FileSystem buildFileSystem(String hdfsHost, String hdfsPort) {
try {
Configuration configuration = buildHdfsConfiguration(hdfsHost, hdfsPort);
FileSystem fs = FileSystem.get(configuration);
return fs;
} catch (Exception e) {
- logger.error("Exception is buildFileSystem :", e);
+ LOG.error("Exception is buildFileSystem :", e);
}
return null;
}
- public void closeFileSystem(FileSystem fileSystem) {
+ private static Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
+ String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
+ Configuration configuration = new Configuration();
+ configuration.set("fs.default.name", url);
+ return configuration;
+ }
+
+ public static void closeFileSystem(FileSystem fileSystem) {
if (fileSystem != null) {
try {
fileSystem.close();
} catch (IOException e) {
- logger.error(e.getLocalizedMessage(), e.getCause());
+ LOG.error(e.getLocalizedMessage(), e.getCause());
}
}
}
-
- public Configuration buildHdfsConfiguration(String hdfsHost, String hdfsPort) {
- String url = "hdfs://" + hdfsHost + ":" + hdfsPort + "/";
- Configuration configuration = new Configuration();
- configuration.set("fs.default.name", url);
- return configuration;
- }
-
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
index d6c3117..13f2865 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/PlaceholderUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -21,38 +21,34 @@ import java.util.HashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-public class PlaceholderUtil {
+import org.apache.commons.lang3.StringUtils;
- private static Pattern placeHolderPattern;
- static {
- placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
+public class PlaceholderUtil {
+ private PlaceholderUtil() {
+ throw new UnsupportedOperationException();
}
+
+ private static final Pattern placeHolderPattern = Pattern.compile("\\$\\s*(\\w+)");
- public static String replaceVariables(String inputStr,
- HashMap<String, String> contextParam) {
+ public static String replaceVariables(String inputStr, HashMap<String, String> contextParam) {
Matcher m = placeHolderPattern.matcher(inputStr);
- String placeholder;
- String replacement;
String output = new String(inputStr);
while (m.find()) {
- placeholder = m.group();
+ String placeholder = m.group();
if (placeholder != null && !placeholder.isEmpty()) {
- String key = placeholder.replace("$","").toLowerCase();// remove
- // brace
- replacement = getFromContext(contextParam, placeholder, key);
+ String key = placeholder.replace("$","").toLowerCase();// remove brace
+ String replacement = getFromContext(contextParam, placeholder, key);
output = output.replace(placeholder, replacement);
}
}
return output;
}
- private static String getFromContext(HashMap<String, String> contextParam,
- String defaultValue, String key) {
- String returnValue = defaultValue;// by default set default value as a
- // return
+ private static String getFromContext(HashMap<String, String> contextParam, String defaultValue, String key) {
+ String returnValue = defaultValue; // by default set default value as a return
if (contextParam != null) {
String value = contextParam.get(key);
- if (value != null && !value.trim().isEmpty()) {
+ if (StringUtils.isNotBlank(value)) {
returnValue = value;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
index 10ea2c2..31a38d0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java
@@ -19,7 +19,6 @@
package org.apache.ambari.logfeeder.util;
import java.io.BufferedReader;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
@@ -27,6 +26,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.zip.GZIPInputStream;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
@@ -39,22 +39,19 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.Upload;
/**
* Utility to connect to s3
*/
public class S3Util {
- public static final S3Util INSTANCE = new S3Util();
-
private static final Logger LOG = Logger.getLogger(S3Util.class);
- public static final String S3_PATH_START_WITH = "s3://";
- public static final String S3_PATH_SEPARATOR = "/";
-
- public AmazonS3 getS3Client(String accessKey, String secretKey) {
- AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
- accessKey, secretKey);
+ private S3Util() {
+ throw new UnsupportedOperationException();
+ }
+
+ public static AmazonS3 getS3Client(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.createAWSCredentials(accessKey, secretKey);
AmazonS3 s3client;
if (awsCredentials != null) {
s3client = new AmazonS3Client(awsCredentials);
@@ -64,9 +61,8 @@ public class S3Util {
return s3client;
}
- public TransferManager getTransferManager(String accessKey, String secretKey) {
- AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials(
- accessKey, secretKey);
+ public static TransferManager getTransferManager(String accessKey, String secretKey) {
+ AWSCredentials awsCredentials = AWSUtil.createAWSCredentials(accessKey, secretKey);
TransferManager transferManager;
if (awsCredentials != null) {
transferManager = new TransferManager(awsCredentials);
@@ -76,35 +72,31 @@ public class S3Util {
return transferManager;
}
- public void shutdownTransferManager(TransferManager transferManager) {
+ public static void shutdownTransferManager(TransferManager transferManager) {
if (transferManager != null) {
transferManager.shutdownNow();
}
}
- public String getBucketName(String s3Path) {
+ public static String getBucketName(String s3Path) {
String bucketName = null;
// s3path
if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
- S3_PATH_SEPARATOR);
+ String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
bucketName = s3PathParts[0];
}
return bucketName;
}
- public String getS3Key(String s3Path) {
+ public static String getS3Key(String s3Path) {
StringBuilder s3Key = new StringBuilder();
- // s3path
if (s3Path != null) {
- String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split(
- S3_PATH_SEPARATOR);
- ArrayList<String> s3PathList = new ArrayList<String>(
- Arrays.asList(s3PathParts));
+ String[] s3PathParts = s3Path.replace(LogFeederConstants.S3_PATH_START_WITH, "").split(LogFeederConstants.S3_PATH_SEPARATOR);
+ ArrayList<String> s3PathList = new ArrayList<String>(Arrays.asList(s3PathParts));
s3PathList.remove(0);// remove bucketName
for (int index = 0; index < s3PathList.size(); index++) {
if (index > 0) {
- s3Key.append(S3_PATH_SEPARATOR);
+ s3Key.append(LogFeederConstants.S3_PATH_SEPARATOR);
}
s3Key.append(s3PathList.get(index));
}
@@ -112,63 +104,41 @@ public class S3Util {
return s3Key.toString();
}
- public void uploadFileTos3(String bucketName, String s3Key, File localFile,
- String accessKey, String secretKey) {
- TransferManager transferManager = getTransferManager(accessKey, secretKey);
- try {
- Upload upload = transferManager.upload(bucketName, s3Key, localFile);
- upload.waitForUploadResult();
- } catch (AmazonClientException | InterruptedException e) {
- LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(),
- e);
- } finally {
- shutdownTransferManager(transferManager);
- }
- }
-
/**
* Get the buffer reader to read s3 file as a stream
*/
- public BufferedReader getReader(String s3Path, String accessKey,
- String secretKey) throws IOException {
+ public static BufferedReader getReader(String s3Path, String accessKey, String secretKey) throws IOException {
// TODO error handling
// Compression support
// read header and decide the compression(auto detection)
// For now hard-code GZIP compression
String s3Bucket = getBucketName(s3Path);
String s3Key = getS3Key(s3Path);
- S3Object fileObj = getS3Client(accessKey, secretKey).getObject(
- new GetObjectRequest(s3Bucket, s3Key));
- GZIPInputStream objectInputStream;
+ S3Object fileObj = getS3Client(accessKey, secretKey).getObject(new GetObjectRequest(s3Bucket, s3Key));
try {
- objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
- BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(
- objectInputStream));
+ GZIPInputStream objectInputStream = new GZIPInputStream(fileObj.getObjectContent());
+ BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(objectInputStream));
return bufferedReader;
} catch (IOException e) {
- LOG.error("Error in creating stream reader for s3 file :" + s3Path,
- e.getCause());
+ LOG.error("Error in creating stream reader for s3 file :" + s3Path, e.getCause());
throw e;
}
}
- public void writeIntoS3File(String data, String bucketName, String s3Key,
- String accessKey, String secretKey) {
+ public static void writeIntoS3File(String data, String bucketName, String s3Key, String accessKey, String secretKey) {
InputStream in = null;
try {
in = IOUtils.toInputStream(data, "UTF-8");
} catch (IOException e) {
LOG.error(e);
}
+
if (in != null) {
TransferManager transferManager = getTransferManager(accessKey, secretKey);
try {
if (transferManager != null) {
- transferManager.upload(
- new PutObjectRequest(bucketName, s3Key, in,
- new ObjectMetadata())).waitForUploadResult();
- LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :"
- + bucketName);
+ transferManager.upload(new PutObjectRequest(bucketName, s3Key, in, new ObjectMetadata())).waitForUploadResult();
+ LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + bucketName);
}
} catch (AmazonClientException | InterruptedException e) {
LOG.error(e);
@@ -182,5 +152,4 @@ public class S3Util {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
deleted file mode 100644
index 44113e1..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.util;
-
-import java.io.IOException;
-import java.util.HashMap;
-
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.solr.client.solrj.SolrClient;
-import org.apache.solr.client.solrj.SolrQuery;
-import org.apache.solr.client.solrj.SolrServerException;
-import org.apache.solr.client.solrj.SolrRequest.METHOD;
-import org.apache.solr.client.solrj.impl.CloudSolrClient;
-import org.apache.solr.client.solrj.impl.HttpSolrClient;
-import org.apache.solr.client.solrj.request.CollectionAdminRequest;
-import org.apache.solr.client.solrj.response.CollectionAdminResponse;
-import org.apache.solr.client.solrj.response.QueryResponse;
-import org.apache.solr.common.SolrDocument;
-import org.apache.solr.common.SolrDocumentList;
-import org.apache.solr.common.SolrException;
-
-public class SolrUtil {
-
- private static final Logger logger = Logger.getLogger(SolrUtil.class);
-
- private static SolrUtil instance = null;
-
- private SolrClient solrClient = null;
- private CloudSolrClient solrClouldClient = null;
-
- private String solrDetail = "";
-
- private SolrUtil() throws Exception {
- String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
- String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
- String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
- connectToSolr(url, zkConnectString, collection);
- }
-
- public static SolrUtil getInstance() {
- if (instance == null) {
- synchronized (SolrUtil.class) {
- if (instance == null) {
- try {
- instance = new SolrUtil();
- } catch (Exception e) {
- final String LOG_MESSAGE_KEY = SolrUtil.class
- .getSimpleName() + "_SOLR_UTIL";
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error constructing solrUtil", e, logger,
- Level.WARN);
- }
- }
- }
- }
- return instance;
- }
-
- private SolrClient connectToSolr(String url, String zkConnectString,
- String collection) throws Exception {
- solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection
- + ", url=" + url;
-
- logger.info("connectToSolr() " + solrDetail);
- if (collection == null || collection.isEmpty()) {
- throw new Exception("For solr, collection name is mandatory. "
- + solrDetail);
- }
- if (zkConnectString != null && !zkConnectString.isEmpty()) {
- solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection;
- logger.info("Using zookeepr. " + solrDetail);
- solrClouldClient = new CloudSolrClient(zkConnectString);
- solrClouldClient.setDefaultCollection(collection);
- solrClient = solrClouldClient;
- int waitDurationMS = 3 * 60 * 1000;
- checkSolrStatus(waitDurationMS);
- } else {
- if (url == null || url.trim().isEmpty()) {
- throw new Exception("Both zkConnectString and URL are empty. zkConnectString="
- + zkConnectString + ", collection=" + collection + ", url="
- + url);
- }
- solrDetail = "collection=" + collection + ", url=" + url;
- String collectionURL = url + "/" + collection;
- logger.info("Connecting to solr : " + collectionURL);
- solrClient = new HttpSolrClient(collectionURL);
-
- }
- return solrClient;
- }
-
- private boolean checkSolrStatus(int waitDurationMS) {
- boolean status = false;
- try {
- long beginTimeMS = System.currentTimeMillis();
- long waitIntervalMS = 2000;
- int pingCount = 0;
- while (true) {
- pingCount++;
- CollectionAdminResponse response = null;
- try {
- CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
- response = colListReq.process(solrClient);
- } catch (Exception ex) {
- logger.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
- }
- if (response != null && response.getStatus() == 0) {
- logger.info("Solr getCollections() is success. solr=" + solrDetail);
- status = true;
- break;
- }
- if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
- logger.error("Solr is not reachable even after "
- + (System.currentTimeMillis() - beginTimeMS)
- + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
- + solrDetail + ", response=" + response);
- break;
- } else {
- logger.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount
- + ". Will sleep for " + waitIntervalMS + " ms and try again." + " solr=" + solrDetail
- + ", response=" + response);
-
- }
- Thread.sleep(waitIntervalMS);
- }
- } catch (Throwable t) {
- logger.error("Seems Solr is not up. solrDetail=" + solrDetail);
- }
- return status;
- }
-
- private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
- if (solrClient != null) {
- QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
- return queryResponse;
- } else {
- logger.error("solrClient can't be null");
- return null;
- }
- }
-
- public HashMap<String, Object> getConfigDoc() {
- HashMap<String, Object> configMap = new HashMap<String, Object>();
- SolrQuery solrQuery = new SolrQuery();
- solrQuery.setQuery("*:*");
- String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
- solrQuery.setFilterQueries(fq);
- try {
- QueryResponse response = process(solrQuery);
- if (response != null) {
- SolrDocumentList documentList = response.getResults();
- if (documentList != null && documentList.size() > 0) {
- SolrDocument configDoc = documentList.get(0);
- String configJson = LogFeederUtil.getGson().toJson(configDoc);
- configMap = (HashMap<String, Object>) LogFeederUtil
- .toJSONObject(configJson);
- }
- }
- } catch (Exception e) {
- final String logMessageKey = this.getClass().getSimpleName()
- + "_FETCH_FILTER_CONFIG_ERROR";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey,
- "Error getting filter config from solr", e, logger, Level.ERROR);
- }
- return configMap;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
deleted file mode 100644
index f030040..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilter.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.view;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VLogfeederFilter {
-
- private String label;
- private List<String> hosts;
- private List<String> defaultLevels;
- private List<String> overrideLevels;
- private String expiryTime;
-
- public VLogfeederFilter() {
- hosts = new ArrayList<String>();
- defaultLevels = new ArrayList<String>();
- overrideLevels = new ArrayList<String>();
- }
-
- public String getLabel() {
- return label;
- }
-
- public void setLabel(String label) {
- this.label = label;
- }
-
- public List<String> getHosts() {
- return hosts;
- }
-
- public void setHosts(List<String> hosts) {
- this.hosts = hosts;
- }
-
- public List<String> getDefaultLevels() {
- return defaultLevels;
- }
-
- public void setDefaultLevels(List<String> defaultLevels) {
- this.defaultLevels = defaultLevels;
- }
-
- public List<String> getOverrideLevels() {
- return overrideLevels;
- }
-
- public void setOverrideLevels(List<String> overrideLevels) {
- this.overrideLevels = overrideLevels;
- }
-
- public String getExpiryTime() {
- return expiryTime;
- }
-
- public void setExpiryTime(String expiryTime) {
- this.expiryTime = expiryTime;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
deleted file mode 100644
index 4ddef3f..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/view/VLogfeederFilterWrapper.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.view;
-
-import java.util.HashMap;
-
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-
-import org.codehaus.jackson.annotate.JsonAutoDetect;
-import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@XmlRootElement
-@XmlAccessorType(XmlAccessType.FIELD)
-public class VLogfeederFilterWrapper {
-
- private HashMap<String, VLogfeederFilter> filter;
- private String id;
-
- public HashMap<String, VLogfeederFilter> getFilter() {
- return filter;
- }
-
- public void setFilter(HashMap<String, VLogfeederFilter> filter) {
- this.filter = filter;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
deleted file mode 100644
index 193cb48..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/AppTest.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.filter.FilterGrok;
-import org.apache.log4j.Logger;
-
-import junit.framework.Test;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-
-/**
- * Unit test for simple App.
- */
-public class AppTest extends TestCase {
- static Logger logger = Logger.getLogger(AppTest.class);
-
- /**
- * Create the test case
- *
- * @param testName name of the test case
- */
- public AppTest(String testName) {
- super(testName);
- }
-
- /**
- * @return the suite of tests being tested
- */
- public static Test suite() {
- return new TestSuite(AppTest.class);
- }
-
- /**
- * Rigourous Test :-)
- */
- public void testApp() {
- assertTrue(true);
- }
-
- public void testGrok() {
- logger.info("testGrok()");
- FilterGrok grokFilter = new FilterGrok();
- try {
- Map<String, Object> map = new HashMap<String, Object>();
- map.put("message_pattern",
- "^%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
- grokFilter.loadConfig(map);
- grokFilter.init();
- String out = grokFilter.grokParse("INFO This is a test");
- logger.info("out=" + out);
-
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- assertFalse(true);
- }
-
- assertTrue(true);
- }
-
- public void testGrokUGI() {
- logger.info("testGrok()");
- String[] ugis = new String[]{"user1@xyz.com (auth:TOKEN)",
- "ambari-qa@example.com (auth:kerberos)",
- "my_user@example.com (auth:kerberos)",
- "hive/bdurai-dojran-2.novalocal@example.com (auth:kerberos)",
- "just_me",
- "ambari-qa (auth:PROXY) via hive/myhost.novalocal@EXAMPLE.COM (auth:KERBEROS)"};
-
- FilterGrok grokFilter = new FilterGrok();
- try {
- Map<String, Object> map = new HashMap<String, Object>();
- // map.put("message_pattern",
- // "(?<user>([\\w\\d\\-]+))\\/|(?<user>([\\w\\d\\-]+))@|(?<user>([\\w\\d\\-]+))/[\\w\\d\\-.]+@|(?<user>([\\w\\d.\\-_]+))[\\s(]+");
- // map.put("message_pattern",
- // "(?<user>([\\w\\d\\-]+))/[\\w\\d\\-.]+@");
- // *(auth:(?<auth>[\\w\\d\\-]+))
- // GOOD: map.put("message_pattern", "(?<user>([\\w\\d\\-]+)).+auth:(?<auth>([\\w\\d\\-]+))");
- // OK: map.put("message_pattern", "(?<user>([\\w\\d\\-]+)).+auth:(?<auth>([\\w\\d\\-]+))|%{USERNAME:xuser}");
- //map.put("message_pattern", "%{USERNAME:user}.+auth:%{USERNAME:authType}|%{USERNAME:x_user}");
- map.put("message_pattern", "%{USERNAME:p_user}.+auth:%{USERNAME:p_authType}.+via %{USERNAME:k_user}.+auth:%{USERNAME:k_authType}|%{USERNAME:user}.+auth:%{USERNAME:authType}|%{USERNAME:x_user}");
- grokFilter.loadConfig(map);
- grokFilter.init();
- for (String ugi : ugis) {
- String out = grokFilter.grokParse(ugi);
- logger.info(ugi + "=" + out);
- }
-
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- assertFalse(true);
- }
- assertTrue(true);
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
index 3aa8d7b..99565c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,7 +23,7 @@ import java.util.Map;
import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
@@ -40,16 +40,16 @@ public class FilterGrokTest {
private static final Logger LOG = Logger.getLogger(FilterGrokTest.class);
private FilterGrok filterGrok;
- private OutputMgr mockOutputMgr;
+ private OutputManager mockOutputManager;
private Capture<Map<String, Object>> capture;
public void init(Map<String, Object> config) throws Exception {
- mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ mockOutputManager = EasyMock.strictMock(OutputManager.class);
capture = EasyMock.newCapture(CaptureType.LAST);
filterGrok = new FilterGrok();
filterGrok.loadConfig(config);
- filterGrok.setOutputMgr(mockOutputMgr);
+ filterGrok.setOutputManager(mockOutputManager);
filterGrok.setInput(EasyMock.mock(Input.class));
filterGrok.init();
}
@@ -59,19 +59,18 @@ public class FilterGrokTest {
LOG.info("testFilterGrok_parseMessage()");
Map<String, Object> config = new HashMap<String, Object>();
- config.put("message_pattern",
- "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
init(config);
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
- filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertNotNull(jsonParams);
@@ -86,23 +85,22 @@ public class FilterGrokTest {
LOG.info("testFilterGrok_parseMultiLineMessage()");
Map<String, Object> config = new HashMap<String, Object>();
- config.put("message_pattern",
- "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
init(config);
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
String multiLineMessage = "This is a multiline test message\r\n" + "having multiple lines\r\n"
+ "as one may expect";
String[] messageLines = multiLineMessage.split("\r\n");
for (int i = 0; i < messageLines.length; i++)
- filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker());
+ filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker(null, null, 0));
filterGrok.flush();
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertNotNull(jsonParams);
@@ -117,19 +115,18 @@ public class FilterGrokTest {
LOG.info("testFilterGrok_notMatchingMesagePattern()");
Map<String, Object> config = new HashMap<String, Object>();
- config.put("message_pattern",
- "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("message_pattern", "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
init(config);
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall().anyTimes();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker());
- filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker());
+ filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+ filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
assertFalse("Something was captured!", capture.hasCaptured());
}
@@ -141,12 +138,12 @@ public class FilterGrokTest {
config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
init(config);
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
- filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker(null, null, 0));
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
assertFalse("Something was captured", capture.hasCaptured());
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
index 64e9b69..06d8db2 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,8 +27,7 @@ import java.util.TimeZone;
import org.apache.ambari.logfeeder.common.LogfeederException;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
@@ -44,16 +43,16 @@ public class FilterJSONTest {
private static final Logger LOG = Logger.getLogger(FilterJSONTest.class);
private FilterJSON filterJson;
- private OutputMgr mockOutputMgr;
+ private OutputManager mockOutputManager;
private Capture<Map<String, Object>> capture;
public void init(Map<String, Object> params) throws Exception {
- mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ mockOutputManager = EasyMock.strictMock(OutputManager.class);
capture = EasyMock.newCapture(CaptureType.LAST);
filterJson = new FilterJSON();
filterJson.loadConfig(params);
- filterJson.setOutputMgr(mockOutputMgr);
+ filterJson.setOutputManager(mockOutputManager);
filterJson.init();
}
@@ -63,17 +62,17 @@ public class FilterJSONTest {
init(new HashMap<String, Object>());
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
Date d = new Date();
- DateFormat sdf = new SimpleDateFormat(LogFeederUtil.SOLR_DATE_FORMAT);
+ DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String dateString = sdf.format(d);
- filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker());
+ filterJson.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
@@ -87,17 +86,17 @@ public class FilterJSONTest {
init(new HashMap<String, Object>());
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
Date d = new Date();
- DateFormat sdf = new SimpleDateFormat(LogFeederUtil.SOLR_DATE_FORMAT);
+ DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
String dateString = sdf.format(d);
- filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker());
+ filterJson.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
@@ -111,13 +110,13 @@ public class FilterJSONTest {
init(new HashMap<String, Object>());
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker());
+ filterJson.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
@@ -132,7 +131,7 @@ public class FilterJSONTest {
init(new HashMap<String, Object>());
String inputStr="invalid json";
try{
- filterJson.apply(inputStr,new InputMarker());
+ filterJson.apply(inputStr,new InputMarker(null, null, 0));
fail("Expected LogfeederException was not occured");
}catch(LogfeederException logfeederException){
assertEquals("Json parsing failed for inputstr = "+inputStr, logfeederException.getLocalizedMessage());
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
index 849e4c3..30cee42 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,7 +22,7 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.output.OutputMgr;
+import org.apache.ambari.logfeeder.output.OutputManager;
import org.apache.log4j.Logger;
import org.easymock.Capture;
import org.easymock.CaptureType;
@@ -38,16 +38,16 @@ public class FilterKeyValueTest {
private static final Logger LOG = Logger.getLogger(FilterKeyValueTest.class);
private FilterKeyValue filterKeyValue;
- private OutputMgr mockOutputMgr;
+ private OutputManager mockOutputManager;
private Capture<Map<String, Object>> capture;
public void init(Map<String, Object> config) throws Exception {
- mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ mockOutputManager = EasyMock.strictMock(OutputManager.class);
capture = EasyMock.newCapture(CaptureType.LAST);
filterKeyValue = new FilterKeyValue();
filterKeyValue.loadConfig(config);
- filterKeyValue.setOutputMgr(mockOutputMgr);
+ filterKeyValue.setOutputManager(mockOutputManager);
filterKeyValue.init();
}
@@ -61,13 +61,13 @@ public class FilterKeyValueTest {
// using default value split:
init(config);
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("keyValueField"));
@@ -85,13 +85,13 @@ public class FilterKeyValueTest {
// using default value split: =
init(config);
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall().anyTimes();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
assertFalse("Something was captured!", capture.hasCaptured());
}
@@ -105,13 +105,13 @@ public class FilterKeyValueTest {
init(config);
// using default value split: =
- mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ mockOutputManager.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
EasyMock.expectLastCall().anyTimes();
- EasyMock.replay(mockOutputMgr);
+ EasyMock.replay(mockOutputManager);
- filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker());
+ filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker(null, null, 0));
- EasyMock.verify(mockOutputMgr);
+ EasyMock.verify(mockOutputManager);
Map<String, Object> jsonParams = capture.getValue();
assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("otherField"));
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
index 42e81da..08aa564 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -104,7 +104,7 @@ public class InputFileTest {
inputFile = new InputFile();
inputFile.loadConfig(config);
- inputFile.setFirstFilter(capture);
+ inputFile.addFilter(capture);
inputFile.init();
}
@@ -117,10 +117,10 @@ public class InputFileTest {
init(testFile.getAbsolutePath());
- InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
- EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir);
- EasyMock.replay(inputMgr);
- inputFile.setInputMgr(inputMgr);
+ InputManager inputManager = EasyMock.createStrictMock(InputManager.class);
+ EasyMock.expect(inputManager.getCheckPointFolderFile()).andReturn(checkPointDir);
+ EasyMock.replay(inputManager);
+ inputFile.setInputManager(inputManager);
inputFile.isReady();
inputFile.start();
@@ -129,7 +129,7 @@ public class InputFileTest {
for (int row = 0; row < 3; row++)
assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
- EasyMock.verify(inputMgr);
+ EasyMock.verify(inputManager);
}
@Test
@@ -140,10 +140,10 @@ public class InputFileTest {
File testFile = createFile("process6.log");
init(testFile.getAbsolutePath());
- InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
- EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
- EasyMock.replay(inputMgr);
- inputFile.setInputMgr(inputMgr);
+ InputManager inputMabager = EasyMock.createStrictMock(InputManager.class);
+ EasyMock.expect(inputMabager.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
+ EasyMock.replay(inputMabager);
+ inputFile.setInputManager(inputMabager);
inputFile.isReady();
inputFile.start();
@@ -155,7 +155,7 @@ public class InputFileTest {
for (int row = 0; row < 6; row++)
assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
- EasyMock.verify(inputMgr);
+ EasyMock.verify(inputMabager);
}
@Test