You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/28 09:41:58 UTC
[31/52] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder
(Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
index c57c028..5fc2e14 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/input/reader/LogsearchReaderFactory.java
@@ -27,17 +27,15 @@ import org.apache.log4j.Logger;
public enum LogsearchReaderFactory {
INSTANCE;
- private static Logger logger = Logger
- .getLogger(LogsearchReaderFactory.class);
+ private static final Logger LOG = Logger.getLogger(LogsearchReaderFactory.class);
public Reader getReader(File file) throws FileNotFoundException {
- logger.debug("Inside reader factory for file:" + file);
+ LOG.debug("Inside reader factory for file:" + file);
if (GZIPReader.isValidFile(file.getAbsolutePath())) {
- logger.info("Reading file " + file + " as gzip file");
+ LOG.info("Reading file " + file + " as gzip file");
return new GZIPReader(file.getAbsolutePath());
} else {
return new FileReader(file);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
deleted file mode 100644
index ae0cfc0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FetchConfigFromSolr.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.util.SolrUtil;
-import org.apache.ambari.logfeeder.view.VLogfeederFilter;
-import org.apache.ambari.logfeeder.view.VLogfeederFilterWrapper;
-import org.apache.log4j.Logger;
-
-public class FetchConfigFromSolr extends Thread {
- private static Logger logger = Logger.getLogger(FetchConfigFromSolr.class);
- private static VLogfeederFilterWrapper logfeederFilterWrapper = null;
- private static int solrConfigInterval = 5;// 5 sec;
- private static long delay;
- private static String endTimeDateFormat = "yyyy-MM-dd'T'HH:mm:ss.SSS";//2016-04-05T04:30:00.000Z
- private static String sysTimeZone = "GMT";
-
- FetchConfigFromSolr(boolean isDaemon) {
- this.setName(this.getClass().getSimpleName());
- this.setDaemon(isDaemon);
- }
-
- @Override
- public void run() {
- String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
- String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
- if ((zkConnectString == null || zkConnectString.trim().length() == 0 )
- && (solrUrl == null || solrUrl.trim().length() == 0)) {
- logger.warn("Neither Solr ZK Connect String nor solr Uril for UserConfig/History is set." +
- "Won't look for level configuration from Solr.");
- return;
- }
- solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", solrConfigInterval);
- delay = 1000 * solrConfigInterval;
- do {
- logger.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
- pullConfigFromSolr();
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage(), e.getCause());
- }
- } while (true);
- }
-
- private synchronized void pullConfigFromSolr() {
- SolrUtil solrUtil = SolrUtil.getInstance();
- if(solrUtil!=null){
- HashMap<String, Object> configDocMap = solrUtil.getConfigDoc();
- if (configDocMap != null) {
- String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
- if (configJson != null) {
- logfeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, VLogfeederFilterWrapper.class);
- }
- }
- }
- }
-
- private static boolean isFilterExpired(VLogfeederFilter logfeederFilter) {
- boolean isFilterExpired = false;// default is false
- if (logfeederFilter != null) {
- Date filterEndDate = parseFilterExpireDate(logfeederFilter);
- if (filterEndDate != null) {
- Date currentDate = getCurrentDate();
- if (currentDate.compareTo(filterEndDate) >= 0) {
- logger.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts :"
- + listToStr(logfeederFilter.getHosts()) + "Filter is expired because of filter endTime : "
- + dateToStr(filterEndDate) + " is older than currentTime :" + dateToStr(currentDate));
- isFilterExpired = true;
- }
- }
- }
- return isFilterExpired;
- }
-
- private static String dateToStr(Date date) {
- if (date == null) {
- return "";
- }
- SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
- TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
- formatter.setTimeZone(timeZone);
- return formatter.format(date);
- }
-
- private static Date parseFilterExpireDate(VLogfeederFilter vLogfeederFilter) {
- String expiryTime = vLogfeederFilter.getExpiryTime();
- if (expiryTime != null && !expiryTime.isEmpty()) {
- SimpleDateFormat formatter = new SimpleDateFormat(endTimeDateFormat);
- TimeZone timeZone = TimeZone.getTimeZone(sysTimeZone);
- formatter.setTimeZone(timeZone);
- try {
- return formatter.parse(expiryTime);
- } catch (ParseException e) {
- logger.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
- + " and hosts :" + listToStr(vLogfeederFilter.getHosts()));
- }
- }
- return null;
- }
-
- public static List<String> getAllowedLevels(String hostName, VLogfeederFilter componentFilter) {
- String componentName = componentFilter.getLabel();
- List<String> hosts = componentFilter.getHosts();
- List<String> defaultLevels = componentFilter.getDefaultLevels();
- List<String> overrideLevels = componentFilter.getOverrideLevels();
- String expiryTime=componentFilter.getExpiryTime();
- //check is user override or not
- if ((expiryTime != null && !expiryTime.isEmpty())
- || (overrideLevels != null && !overrideLevels.isEmpty())
- || (hosts != null && !hosts.isEmpty())) {
- if (hosts == null || hosts.isEmpty()) {
- // hosts list is empty or null consider it apply on all hosts
- hosts.add(LogFeederConstants.ALL);
- }
- if (LogFeederUtil.isListContains(hosts, hostName, false)) {
- if (isFilterExpired(componentFilter)) {
- logger.debug("Filter for component " + componentName + " and host :"
- + hostName + " is expired at " + componentFilter.getExpiryTime());
- return defaultLevels;
- } else {
- return overrideLevels;
- }
- }
- }
- return defaultLevels;
- }
-
- public static boolean isFilterAvailable() {
- return logfeederFilterWrapper != null;
- }
-
- public static VLogfeederFilter findComponentFilter(String componentName) {
- if (logfeederFilterWrapper != null) {
- HashMap<String, VLogfeederFilter> filter = logfeederFilterWrapper.getFilter();
- if (filter != null) {
- VLogfeederFilter componentFilter = filter.get(componentName);
- if (componentFilter != null) {
- return componentFilter;
- }
- }
- }
- logger.trace("Filter is not there for component :" + componentName);
- return null;
- }
-
-
- public static Date getCurrentDate() {
- TimeZone.setDefault(TimeZone.getTimeZone(sysTimeZone));
- Date date = new Date();
- return date;
- }
-
- public static String listToStr(List<String> strList) {
- StringBuilder out = new StringBuilder("[");
- if (strList != null) {
- int counter = 0;
- for (Object o : strList) {
- if (counter > 0) {
- out.append(",");
- }
- out.append(o.toString());
- counter++;
- }
- }
- out.append("]");
- return out.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
new file mode 100644
index 0000000..801a289
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/FilterLogData.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Logger;
+
+/**
+ * Read configuration from solr and filter the log
+ */
+public enum FilterLogData {
+ INSTANCE;
+
+ private static final Logger LOG = Logger.getLogger(FilterLogData.class);
+
+ private static final boolean DEFAULT_VALUE = true;
+
+ public boolean isAllowed(String jsonBlock) {
+ if (StringUtils.isEmpty(jsonBlock)) {
+ return DEFAULT_VALUE;
+ }
+ Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
+ return isAllowed(jsonObj);
+ }
+
+ public boolean isAllowed(Map<String, Object> jsonObj) {
+ boolean isAllowed = applyFilter(jsonObj);
+ if (!isAllowed) {
+ LOG.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
+ }
+ return isAllowed;
+ }
+
+
+ private boolean applyFilter(Map<String, Object> jsonObj) {
+ if (MapUtils.isEmpty(jsonObj)) {
+ LOG.warn("Output jsonobj is empty");
+ return DEFAULT_VALUE;
+ }
+
+ String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
+ String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
+ String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
+ if (StringUtils.isNotBlank(hostName) && StringUtils.isNotBlank(componentName) && StringUtils.isNotBlank(level)) {
+ LogFeederFilter componentFilter = LogConfigHandler.findComponentFilter(componentName);
+ if (componentFilter == null) {
+ return DEFAULT_VALUE;
+ }
+ List<String> allowedLevels = LogConfigHandler.getAllowedLevels(hostName, componentFilter);
+ if (CollectionUtils.isEmpty(allowedLevels)) {
+ allowedLevels.add(LogFeederConstants.ALL);
+ }
+ return LogFeederUtil.isListContains(allowedLevels, level, false);
+ }
+ else {
+ return DEFAULT_VALUE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
new file mode 100644
index 0000000..12c744c
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigFetcher.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.SolrRequest.METHOD;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.impl.HttpSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
+import org.apache.solr.client.solrj.response.CollectionAdminResponse;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.apache.solr.common.SolrException;
+
+public class LogConfigFetcher {
+ private static final Logger LOG = Logger.getLogger(LogConfigFetcher.class);
+
+ private static LogConfigFetcher instance;
+ public synchronized static LogConfigFetcher getInstance() {
+ if (instance == null) {
+ try {
+ instance = new LogConfigFetcher();
+ } catch (Exception e) {
+ String logMessageKey = LogConfigFetcher.class.getSimpleName() + "_SOLR_UTIL";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error constructing solrUtil", e, LOG, Level.WARN);
+ }
+ }
+ return instance;
+ }
+
+ private SolrClient solrClient;
+
+ private String solrDetail = "";
+
+ public LogConfigFetcher() throws Exception {
+ String url = LogFeederUtil.getStringProperty("logfeeder.solr.url");
+ String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
+ String collection = LogFeederUtil.getStringProperty("logfeeder.solr.core.config.name", "history");
+ connectToSolr(url, zkConnectString, collection);
+ }
+
+ private SolrClient connectToSolr(String url, String zkConnectString, String collection) throws Exception {
+ solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection + ", url=" + url;
+
+ LOG.info("connectToSolr() " + solrDetail);
+ if (StringUtils.isEmpty(collection)) {
+ throw new Exception("For solr, collection name is mandatory. " + solrDetail);
+ }
+
+ if (StringUtils.isEmpty(zkConnectString) && StringUtils.isBlank(url))
+ throw new Exception("Both zkConnectString and URL are empty. zkConnectString=" + zkConnectString + ", collection=" +
+ collection + ", url=" + url);
+
+ if (StringUtils.isNotEmpty(zkConnectString)) {
+ solrDetail = "zkConnectString=" + zkConnectString + ", collection=" + collection;
+ LOG.info("Using zookeepr. " + solrDetail);
+ CloudSolrClient solrClouldClient = new CloudSolrClient(zkConnectString);
+ solrClouldClient.setDefaultCollection(collection);
+ solrClient = solrClouldClient;
+ checkSolrStatus(3 * 60 * 1000);
+ } else {
+ solrDetail = "collection=" + collection + ", url=" + url;
+ String collectionURL = url + "/" + collection;
+ LOG.info("Connecting to solr : " + collectionURL);
+ solrClient = new HttpSolrClient(collectionURL);
+ }
+ return solrClient;
+ }
+
+ private boolean checkSolrStatus(int waitDurationMS) {
+ boolean status = false;
+ try {
+ long beginTimeMS = System.currentTimeMillis();
+ long waitIntervalMS = 2000;
+ int pingCount = 0;
+ while (true) {
+ pingCount++;
+ CollectionAdminResponse response = null;
+ try {
+ CollectionAdminRequest.List colListReq = new CollectionAdminRequest.List();
+ response = colListReq.process(solrClient);
+ } catch (Exception ex) {
+ LOG.error("Con't connect to Solr. solrDetail=" + solrDetail, ex);
+ }
+ if (response != null && response.getStatus() == 0) {
+ LOG.info("Solr getCollections() is success. solr=" + solrDetail);
+ status = true;
+ break;
+ }
+ if (System.currentTimeMillis() - beginTimeMS > waitDurationMS) {
+ LOG.error("Solr is not reachable even after " + (System.currentTimeMillis() - beginTimeMS)
+ + " ms. If you are using alias, then you might have to restart LogSearch after Solr is up and running. solr="
+ + solrDetail + ", response=" + response);
+ break;
+ } else {
+ LOG.warn("Solr is not reachable yet. getCollections() attempt count=" + pingCount + ". Will sleep for " +
+ waitIntervalMS + " ms and try again." + " solr=" + solrDetail + ", response=" + response);
+ }
+ Thread.sleep(waitIntervalMS);
+ }
+ } catch (Throwable t) {
+ LOG.error("Seems Solr is not up. solrDetail=" + solrDetail, t);
+ }
+ return status;
+ }
+
+ public Map<String, Object> getConfigDoc() {
+ HashMap<String, Object> configMap = new HashMap<String, Object>();
+ SolrQuery solrQuery = new SolrQuery();
+ solrQuery.setQuery("*:*");
+ String fq = LogFeederConstants.ROW_TYPE + ":" + LogFeederConstants.LOGFEEDER_FILTER_NAME;
+ solrQuery.setFilterQueries(fq);
+ try {
+ QueryResponse response = process(solrQuery);
+ if (response != null) {
+ SolrDocumentList documentList = response.getResults();
+ if (CollectionUtils.isNotEmpty(documentList)) {
+ SolrDocument configDoc = documentList.get(0);
+ String configJson = LogFeederUtil.getGson().toJson(configDoc);
+ configMap = (HashMap<String, Object>) LogFeederUtil.toJSONObject(configJson);
+ }
+ }
+ } catch (Exception e) {
+ String logMessageKey = this.getClass().getSimpleName() + "_FETCH_FILTER_CONFIG_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error getting filter config from solr", e, LOG, Level.ERROR);
+ }
+ return configMap;
+ }
+
+ private QueryResponse process(SolrQuery solrQuery) throws SolrServerException, IOException, SolrException {
+ if (solrClient != null) {
+ QueryResponse queryResponse = solrClient.query(solrQuery, METHOD.POST);
+ return queryResponse;
+ } else {
+ LOG.error("solrClient can't be null");
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
new file mode 100644
index 0000000..4f52b0b
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogConfigHandler.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.log4j.Logger;
+
+public class LogConfigHandler extends Thread {
+ private static final Logger LOG = Logger.getLogger(LogConfigHandler.class);
+
+ private static final int DEFAULT_SOLR_CONFIG_INTERVAL = 5;
+ private static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS";
+ private static final String TIMEZONE = "GMT";
+
+ static {
+ TimeZone.setDefault(TimeZone.getTimeZone(TIMEZONE));
+ }
+
+ private static ThreadLocal<DateFormat> formatter = new ThreadLocal<DateFormat>() {
+ protected DateFormat initialValue() {
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
+ dateFormat.setTimeZone(TimeZone.getTimeZone(TIMEZONE));
+ return dateFormat;
+ }
+ };
+
+ private static LogFeederFilterWrapper logFeederFilterWrapper;
+
+ private static boolean running = false;
+
+ public static void handleConfig() {
+ boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
+ if (!filterEnable) {
+ LOG.info("Logfeeder filter Scheduler is disabled.");
+ return;
+ }
+ if (!running) {
+ new LogConfigHandler().start();
+ running = true;
+ LOG.info("Logfeeder Filter Thread started!");
+ } else {
+ LOG.warn("Logfeeder Filter Thread is already running.");
+ }
+ }
+
+ private LogConfigHandler() {
+ setName(getClass().getSimpleName());
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ String zkConnectString = LogFeederUtil.getStringProperty("logfeeder.solr.zk_connect_string");
+ String solrUrl = LogFeederUtil.getStringProperty("logfeeder.solr.url");
+ if (StringUtils.isBlank(zkConnectString) && StringUtils.isBlank(solrUrl)) {
+ LOG.warn("Neither Solr ZK Connect String nor solr Url for UserConfig/History is set." +
+ "Won't look for level configuration from Solr.");
+ return;
+ }
+
+ int solrConfigInterval = LogFeederUtil.getIntProperty("logfeeder.solr.config.interval", DEFAULT_SOLR_CONFIG_INTERVAL);
+ do {
+ LOG.debug("Updating config from solr after every " + solrConfigInterval + " sec.");
+ fetchConfig();
+ try {
+ Thread.sleep(1000 * solrConfigInterval);
+ } catch (InterruptedException e) {
+ LOG.error(e.getLocalizedMessage(), e.getCause());
+ }
+ } while (true);
+ }
+
+ private synchronized void fetchConfig() {
+ LogConfigFetcher fetcher = LogConfigFetcher.getInstance();
+ if (fetcher != null) {
+ Map<String, Object> configDocMap = fetcher.getConfigDoc();
+ String configJson = (String) configDocMap.get(LogFeederConstants.VALUES);
+ if (configJson != null) {
+ logFeederFilterWrapper = LogFeederUtil.getGson().fromJson(configJson, LogFeederFilterWrapper.class);
+ }
+ }
+ }
+
+ public static boolean isFilterAvailable() {
+ return logFeederFilterWrapper != null;
+ }
+
+ public static List<String> getAllowedLevels(String hostName, LogFeederFilter componentFilter) {
+ String componentName = componentFilter.getLabel();
+ List<String> hosts = componentFilter.getHosts();
+ List<String> defaultLevels = componentFilter.getDefaultLevels();
+ List<String> overrideLevels = componentFilter.getOverrideLevels();
+ String expiryTime = componentFilter.getExpiryTime();
+
+ // check is user override or not
+ if (StringUtils.isNotEmpty(expiryTime) || CollectionUtils.isNotEmpty(overrideLevels) || CollectionUtils.isNotEmpty(hosts)) {
+ if (CollectionUtils.isEmpty(hosts)) { // hosts list is empty or null consider it apply on all hosts
+ hosts.add(LogFeederConstants.ALL);
+ }
+
+ if (LogFeederUtil.isListContains(hosts, hostName, false)) {
+ if (isFilterExpired(componentFilter)) {
+ LOG.debug("Filter for component " + componentName + " and host :" + hostName + " is expired at " +
+ componentFilter.getExpiryTime());
+ return defaultLevels;
+ } else {
+ return overrideLevels;
+ }
+ }
+ }
+ return defaultLevels;
+ }
+
+ private static boolean isFilterExpired(LogFeederFilter logfeederFilter) {
+ if (logfeederFilter == null)
+ return false;
+
+ Date filterEndDate = parseFilterExpireDate(logfeederFilter);
+ if (filterEndDate == null) {
+ return false;
+ }
+
+ Date currentDate = new Date();
+ if (!currentDate.before(filterEndDate)) {
+ LOG.debug("Filter for Component :" + logfeederFilter.getLabel() + " and Hosts : [" +
+ StringUtils.join(logfeederFilter.getHosts(), ',') + "] is expired because of filter endTime : " +
+ formatter.get().format(filterEndDate) + " is older than currentTime :" + formatter.get().format(currentDate));
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ private static Date parseFilterExpireDate(LogFeederFilter vLogfeederFilter) {
+ String expiryTime = vLogfeederFilter.getExpiryTime();
+ if (StringUtils.isNotEmpty(expiryTime)) {
+ try {
+ return formatter.get().parse(expiryTime);
+ } catch (ParseException e) {
+ LOG.error("Filter have invalid ExpiryTime : " + expiryTime + " for component :" + vLogfeederFilter.getLabel()
+ + " and hosts : [" + StringUtils.join(vLogfeederFilter.getHosts(), ',') + "]");
+ }
+ }
+ return null;
+ }
+
+ public static LogFeederFilter findComponentFilter(String componentName) {
+ if (logFeederFilterWrapper != null) {
+ HashMap<String, LogFeederFilter> filter = logFeederFilterWrapper.getFilter();
+ if (filter != null) {
+ LogFeederFilter componentFilter = filter.get(componentName);
+ if (componentFilter != null) {
+ return componentFilter;
+ }
+ }
+ }
+ LOG.trace("Filter is not there for component :" + componentName);
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
deleted file mode 100644
index 09673a0..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederConstants.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig;
-
-public class LogFeederConstants {
-
- public static final String ALL = "all";
- public static final String LOGFEEDER_FILTER_NAME = "log_feeder_config";
- public static final String LOG_LEVEL_UNKNOWN = "UNKNOWN";
- // solr fields
- public static final String SOLR_LEVEL = "level";
- public static final String SOLR_COMPONENT = "type";
- public static final String SOLR_HOST = "host";
-
- // UserConfig Constants History
- public static final String VALUES = "jsons";
- public static final String ROW_TYPE = "rowtype";
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
new file mode 100644
index 0000000..60c8ae8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LogFeederFilter {
+
+ private String label;
+ private List<String> hosts;
+ private List<String> defaultLevels;
+ private List<String> overrideLevels;
+ private String expiryTime;
+
+ public LogFeederFilter() {
+ hosts = new ArrayList<String>();
+ defaultLevels = new ArrayList<String>();
+ overrideLevels = new ArrayList<String>();
+ }
+
+ public String getLabel() {
+ return label;
+ }
+
+ public void setLabel(String label) {
+ this.label = label;
+ }
+
+ public List<String> getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(List<String> hosts) {
+ this.hosts = hosts;
+ }
+
+ public List<String> getDefaultLevels() {
+ return defaultLevels;
+ }
+
+ public void setDefaultLevels(List<String> defaultLevels) {
+ this.defaultLevels = defaultLevels;
+ }
+
+ public List<String> getOverrideLevels() {
+ return overrideLevels;
+ }
+
+ public void setOverrideLevels(List<String> overrideLevels) {
+ this.overrideLevels = overrideLevels;
+ }
+
+ public String getExpiryTime() {
+ return expiryTime;
+ }
+
+ public void setExpiryTime(String expiryTime) {
+ this.expiryTime = expiryTime;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
new file mode 100644
index 0000000..9199cd3
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogFeederFilterWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ambari.logfeeder.logconfig;
+
+import java.util.HashMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.codehaus.jackson.annotate.JsonAutoDetect;
+import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class LogFeederFilterWrapper {
+
+ private HashMap<String, LogFeederFilter> filter;
+ private String id;
+
+ public HashMap<String, LogFeederFilter> getFilter() {
+ return filter;
+ }
+
+ public void setFilter(HashMap<String, LogFeederFilter> filter) {
+ this.filter = filter;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
deleted file mode 100644
index bc807193..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/LogfeederScheduler.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-public enum LogfeederScheduler {
-
- INSTANCE;
-
- private Logger logger = Logger.getLogger(LogfeederScheduler.class);
-
- private static boolean running = false;
-
- public synchronized void start() {
- boolean filterEnable = LogFeederUtil.getBooleanProperty("logfeeder.log.filter.enable", false);
- if (!filterEnable) {
- logger.info("Logfeeder filter Scheduler is disabled.");
- return;
- }
- if (!running) {
- for (Thread thread : getThreadList()) {
- thread.start();
- }
- running = true;
- logger.info("Logfeeder Scheduler started!");
- } else {
- logger.warn("Logfeeder Scheduler is already running.");
- }
- }
-
- private List<Thread> getThreadList() {
- List<Thread> tasks = new ArrayList<Thread>();
- Thread configMonitor = new FetchConfigFromSolr(true);
- tasks.add(configMonitor);
- return tasks;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
deleted file mode 100644
index b5e4eb3..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/ApplyLogFilter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.ambari.logfeeder.view.VLogfeederFilter;
-import org.apache.log4j.Logger;
-
-class ApplyLogFilter extends DefaultDataFilter {
-
- private static Logger logger = Logger.getLogger(ApplyLogFilter.class);
-
- @Override
- public boolean applyFilter(Map<String, Object> jsonObj, boolean defaultValue) {
- if (isEmpty(jsonObj)) {
- logger.warn("Output jsonobj is empty");
- return defaultValue;
- }
- String hostName = (String) jsonObj.get(LogFeederConstants.SOLR_HOST);
- if (isNotEmpty(hostName)) {
- String componentName = (String) jsonObj.get(LogFeederConstants.SOLR_COMPONENT);
- if (isNotEmpty(componentName)) {
- String level = (String) jsonObj.get(LogFeederConstants.SOLR_LEVEL);
- if (isNotEmpty(level)) {
- VLogfeederFilter componentFilter = FetchConfigFromSolr.findComponentFilter(componentName);
- if (componentFilter == null) {
- return defaultValue;
- }
- List<String> allowedLevels = FetchConfigFromSolr.getAllowedLevels(
- hostName, componentFilter);
- if (allowedLevels == null || allowedLevels.isEmpty()) {
- allowedLevels.add(LogFeederConstants.ALL);
- }
- return LogFeederUtil.isListContains(allowedLevels, level, false);
- }
- }
- }
- return defaultValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
deleted file mode 100644
index 04d2ca4..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/DefaultDataFilter.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.Map;
-
-/**
- * Default filter to allow everything
- */
-class DefaultDataFilter {
- public boolean applyFilter(Map<String, Object> outputJsonObj, boolean defaultValue) {
- return defaultValue;
- }
-
- protected boolean isEmpty(Map<String, Object> map) {
- if (map == null || map.isEmpty()) {
- return true;
- }
- return false;
- }
-
- protected boolean isEmpty(String str) {
- if (str == null || str.trim().isEmpty()) {
- return true;
- }
- return false;
- }
-
- protected boolean isNotEmpty(String str) {
- return !isEmpty(str);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
deleted file mode 100644
index 3a8eae9..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/logconfig/filter/FilterLogData.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.logconfig.filter;
-
-import java.util.Map;
-
-import org.apache.ambari.logfeeder.logconfig.filter.ApplyLogFilter;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Logger;
-
-/**
- * Read configuration from solr and filter the log
- */
-public enum FilterLogData {
- INSTANCE;
- private ApplyLogFilter applyLogFilter = new ApplyLogFilter();
- private static Logger logger = Logger.getLogger(FilterLogData.class);
- // by default allow every log
- boolean defaultValue = true;
-
- public boolean isAllowed(String jsonBlock) {
- if (jsonBlock == null || jsonBlock.isEmpty()) {
- return defaultValue;
- }
- Map<String, Object> jsonObj = LogFeederUtil.toJSONObject(jsonBlock);
- return isAllowed(jsonObj);
- }
-
- public boolean isAllowed(Map<String, Object> jsonObj) {
- boolean isAllowed = applyLogFilter.applyFilter(jsonObj, defaultValue);
- if (!isAllowed) {
- logger.trace("Filter block the content :" + LogFeederUtil.getGson().toJson(jsonObj));
- }
- return isAllowed;
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
index 906dd25..96709c0 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/Mapper.java
@@ -26,22 +26,18 @@ public abstract class Mapper {
protected String fieldName;
private String mapClassCode;
- public boolean init(String inputDesc, String fieldName,
- String mapClassCode, Object mapConfigs) {
+ public abstract boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs);
+
+ protected void init(String inputDesc, String fieldName, String mapClassCode) {
this.inputDesc = inputDesc;
this.fieldName = fieldName;
this.mapClassCode = mapClassCode;
- return true;
}
- public Object apply(Map<String, Object> jsonObj, Object value) {
- return value;
- }
+ public abstract Object apply(Map<String, Object> jsonObj, Object value);
@Override
public String toString() {
- return "mapClass=" + mapClassCode + ", input=" + inputDesc
- + ", fieldName=" + fieldName;
+ return "mapClass=" + mapClassCode + ", input=" + inputDesc + ", fieldName=" + fieldName;
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
index 6dbf8be..eb3ae01 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperDate.java
@@ -31,31 +31,29 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class MapperDate extends Mapper {
- private static final Logger logger = Logger.getLogger(MapperDate.class);
+ private static final Logger LOG = Logger.getLogger(MapperDate.class);
private SimpleDateFormat targetDateFormatter = null;
private boolean isEpoch = false;
private SimpleDateFormat srcDateFormatter=null;
@Override
- public boolean init(String inputDesc, String fieldName,
- String mapClassCode, Object mapConfigs) {
- super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ init(inputDesc, fieldName, mapClassCode);
if (!(mapConfigs instanceof Map)) {
- logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
- + mapConfigs.getClass().getName()
- + ", map="
- + this.toString());
+ LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName() +
+ ", map=" + this);
return false;
}
+
@SuppressWarnings("unchecked")
Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
String targetDateFormat = (String) mapObjects.get("target_date_pattern");
String srcDateFormat = (String) mapObjects.get("src_date_pattern");
if (StringUtils.isEmpty(targetDateFormat)) {
- logger.fatal("Date format for map is empty. " + this.toString());
+ LOG.fatal("Date format for map is empty. " + this);
} else {
- logger.info("Date mapper format is " + targetDateFormat);
+ LOG.info("Date mapper format is " + targetDateFormat);
if (targetDateFormat.equalsIgnoreCase("epoch")) {
isEpoch = true;
@@ -68,8 +66,7 @@ public class MapperDate extends Mapper {
}
return true;
} catch (Throwable ex) {
- logger.fatal("Error creating date format. format="
- + targetDateFormat + ". " + this.toString());
+ LOG.fatal("Error creating date format. format=" + targetDateFormat + ". " + this.toString());
}
}
}
@@ -84,7 +81,7 @@ public class MapperDate extends Mapper {
long ms = Long.parseLong(value.toString()) * 1000;
value = new Date(ms);
} else if (targetDateFormatter != null) {
- if(srcDateFormatter!=null){
+ if (srcDateFormatter != null) {
Date srcDate = srcDateFormatter.parse(value.toString());
//set year in src_date when src_date does not have year component
if (!srcDateFormatter.toPattern().contains("yy")) {
@@ -108,12 +105,9 @@ public class MapperDate extends Mapper {
}
jsonObj.put(fieldName, value);
} catch (Throwable t) {
- LogFeederUtil.logErrorMessageByInterval(this.getClass()
- .getSimpleName() + ":apply",
- "Error applying date transformation. isEpoch="
- + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
- + ", value=" + value + ". " + this.toString(),
- t, logger, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply", "Error applying date transformation." +
+ " isEpoch=" + isEpoch + ", targetateFormat=" + (targetDateFormatter!=null ?targetDateFormatter.toPattern():"")
+ + ", value=" + value + ". " + this.toString(), t, LOG, Level.ERROR);
}
}
return value;
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
index c692a9d..9b6e83c 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldName.java
@@ -30,24 +30,23 @@ import org.apache.log4j.Logger;
* Overrides the value for the field
*/
public class MapperFieldName extends Mapper {
- private static final Logger logger = Logger.getLogger(MapperFieldName.class);
+ private static final Logger LOG = Logger.getLogger(MapperFieldName.class);
private String newValue = null;
@Override
- public boolean init(String inputDesc, String fieldName,
- String mapClassCode, Object mapConfigs) {
- super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ init(inputDesc, fieldName, mapClassCode);
if (!(mapConfigs instanceof Map)) {
- logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
- + mapConfigs.getClass().getName());
+ LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
return false;
}
+
@SuppressWarnings("unchecked")
Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
newValue = (String) mapObjects.get("new_fieldname");
if (StringUtils.isEmpty(newValue)) {
- logger.fatal("Map field value is empty.");
+ LOG.fatal("Map field value is empty.");
return false;
}
return true;
@@ -59,12 +58,9 @@ public class MapperFieldName extends Mapper {
jsonObj.remove(fieldName);
jsonObj.put(newValue, value);
} else {
- LogFeederUtil.logErrorMessageByInterval(this.getClass()
- .getSimpleName() + ":apply",
- "New fieldName is null, so transformation is not applied. "
- + this.toString(), null, logger, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
+ "New fieldName is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR);
}
return value;
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
index e618261..87cda65 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/mapper/MapperFieldValue.java
@@ -30,25 +30,25 @@ import org.apache.log4j.Logger;
* Overrides the value for the field
*/
public class MapperFieldValue extends Mapper {
- private Logger logger = Logger.getLogger(MapperFieldValue.class);
+ private static final Logger LOG = Logger.getLogger(MapperFieldValue.class);
+
private String prevValue = null;
private String newValue = null;
@Override
- public boolean init(String inputDesc, String fieldName,
- String mapClassCode, Object mapConfigs) {
- super.init(inputDesc, fieldName, mapClassCode, mapConfigs);
+ public boolean init(String inputDesc, String fieldName, String mapClassCode, Object mapConfigs) {
+ init(inputDesc, fieldName, mapClassCode);
if (!(mapConfigs instanceof Map)) {
- logger.fatal("Can't initialize object. mapConfigs class is not of type Map. "
- + mapConfigs.getClass().getName());
+ LOG.fatal("Can't initialize object. mapConfigs class is not of type Map. " + mapConfigs.getClass().getName());
return false;
}
+
@SuppressWarnings("unchecked")
Map<String, Object> mapObjects = (Map<String, Object>) mapConfigs;
prevValue = (String) mapObjects.get("pre_value");
newValue = (String) mapObjects.get("post_value");
if (StringUtils.isEmpty(newValue)) {
- logger.fatal("Map field value is empty.");
+ LOG.fatal("Map field value is empty.");
return false;
}
return true;
@@ -56,20 +56,15 @@ public class MapperFieldValue extends Mapper {
@Override
public Object apply(Map<String, Object> jsonObj, Object value) {
- if (newValue != null) {
- if (prevValue != null) {
- if (prevValue.equalsIgnoreCase(value.toString())) {
- value = newValue;
- jsonObj.put(fieldName, value);
- }
+ if (newValue != null && prevValue != null) {
+ if (prevValue.equalsIgnoreCase(value.toString())) {
+ value = newValue;
+ jsonObj.put(fieldName, value);
}
} else {
- LogFeederUtil.logErrorMessageByInterval(
- this.getClass().getSimpleName() + ":apply",
- "New value is null, so transformation is not applied. "
- + this.toString(), null, logger, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(this.getClass().getSimpleName() + ":apply",
+ "New value is null, so transformation is not applied. " + this.toString(), null, LOG, Level.ERROR);
}
return value;
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
index c99a091..0766514 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/LogFeederAMSClient.java
@@ -20,25 +20,25 @@
package org.apache.ambari.logfeeder.metrics;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.log4j.Logger;
public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
- private static final Logger logger = Logger.getLogger(LogFeederAMSClient.class);
+ private static final Logger LOG = Logger.getLogger(LogFeederAMSClient.class);
private String collectorHosts = null;
public LogFeederAMSClient() {
- collectorHosts = LogFeederUtil
- .getStringProperty("logfeeder.metrics.collector.hosts");
- if (collectorHosts != null && collectorHosts.trim().length() == 0) {
+ collectorHosts = LogFeederUtil.getStringProperty("logfeeder.metrics.collector.hosts");
+ if (StringUtils.isBlank(collectorHosts)) {
collectorHosts = null;
}
if (collectorHosts != null) {
collectorHosts = collectorHosts.trim();
}
- logger.info("AMS collector URL=" + collectorHosts);
+ LOG.info("AMS collector URL=" + collectorHosts);
}
@Override
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
deleted file mode 100644
index abb84c7..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricCount.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.metrics;
-
-public class MetricCount {
- public String metricsName = null;
- public boolean isPointInTime = false;
-
- public long count = 0;
- public long prevLogCount = 0;
- public long prevLogMS = System.currentTimeMillis();
- public long prevPublishCount = 0;
- public int publishCount = 0; // Count of published metrics. Used for first time sending metrics
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
new file mode 100644
index 0000000..e7f5d37
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricData.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+public class MetricData {
+ public final String metricsName;
+ public final boolean isPointInTime;
+
+ public MetricData(String metricsName, boolean isPointInTime) {
+ this.metricsName = metricsName;
+ this.isPointInTime = isPointInTime;
+ }
+
+ public long value = 0;
+ public long prevPublishValue = 0;
+
+ public long prevLogValue = 0;
+ public long prevLogTime = System.currentTimeMillis();
+
+ public int publishCount = 0; // Number of times the metric was published so far
+
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
new file mode 100644
index 0000000..a679a33
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsManager.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.metrics;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.log4j.Logger;
+
+public class MetricsManager {
+ private static final Logger LOG = Logger.getLogger(MetricsManager.class);
+
+ private boolean isMetricsEnabled = false;
+ private String nodeHostName = null;
+ private String appId = "logfeeder";
+
+ private long lastPublishTimeMS = 0; // Let's do the first publish immediately
+ private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
+
+ private int publishIntervalMS = 60 * 1000;
+ private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep the metrics in memory forever
+ private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
+ private LogFeederAMSClient amsClient = null;
+
+ public void init() {
+ LOG.info("Initializing MetricsManager()");
+ amsClient = new LogFeederAMSClient();
+
+ if (amsClient.getCollectorUri(null) != null) {
+ findNodeHostName();
+ if (nodeHostName == null) {
+ isMetricsEnabled = false;
+ LOG.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+ } else {
+ isMetricsEnabled = true;
+ LOG.info("LogFeeder Metrics is enabled. Metrics host=" + amsClient.getCollectorUri(null));
+ }
+ } else {
+ LOG.info("LogFeeder Metrics publish is disabled");
+ }
+ }
+
+ private void findNodeHostName() {
+ nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost().getHostName();
+ } catch (Throwable e) {
+ LOG.warn("Error getting hostname using InetAddress.getLocalHost().getHostName()", e);
+ }
+ }
+ if (nodeHostName == null) {
+ try {
+ nodeHostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (Throwable e) {
+ LOG.warn("Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()", e);
+ }
+ if (nodeHostName == null) {
+ isMetricsEnabled = false;
+ logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
+ } else {
+ isMetricsEnabled = true;
+ logger.info("LogFeeder Metrics is enabled. Metrics host="
+ + amsClient.getCollectorUri(null));
+ }
+ } else {
+ logger.info("LogFeeder Metrics publish is disabled");
+ }
+ }
+
+ public boolean isMetricsEnabled() {
+ return isMetricsEnabled;
+ }
+
+ public synchronized void useMetrics(List<MetricData> metricsList) {
+ if (!isMetricsEnabled) {
+ return;
+ }
+ LOG.info("useMetrics() metrics.size=" + metricsList.size());
+ long currMS = System.currentTimeMillis();
+
+ gatherMetrics(metricsList, currMS);
+ publishMetrics(currMS);
+ }
+
+ private void gatherMetrics(List<MetricData> metricsList, long currMS) {
+ Long currMSLong = new Long(currMS);
+ for (MetricData metric : metricsList) {
+ if (metric.metricsName == null) {
+ LOG.debug("metric.metricsName is null");
+ continue;
+ }
+ long currCount = metric.value;
+ if (!metric.isPointInTime && metric.publishCount > 0 && currCount <= metric.prevPublishValue) {
+ LOG.debug("Nothing changed. " + metric.metricsName + ", currCount=" + currCount + ", prevPublishCount=" +
+ metric.prevPublishValue);
+ continue;
+ }
+ metric.publishCount++;
+
+ LOG.debug("Ensuring metrics=" + metric.metricsName);
+ TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
+ if (timelineMetric == null) {
+ LOG.debug("Creating new metric obbject for " + metric.metricsName);
+ timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metric.metricsName);
+ timelineMetric.setHostName(nodeHostName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setStartTime(currMS);
+ timelineMetric.setType("Long");
+ timelineMetric.setMetricValues(new TreeMap<Long, Double>());
+
+ metricsMap.put(metric.metricsName, timelineMetric);
+ }
+
+ LOG.debug("Adding metrics=" + metric.metricsName);
+ if (metric.isPointInTime) {
+ timelineMetric.getMetricValues().put(currMSLong, new Double(currCount));
+ } else {
+ Double value = timelineMetric.getMetricValues().get(currMSLong);
+ if (value == null) {
+ value = new Double(0);
+ }
+ value += (currCount - metric.prevPublishValue);
+ timelineMetric.getMetricValues().put(currMSLong, value);
+ metric.prevPublishValue = currCount;
+ }
+ }
+ }
+
+ private void publishMetrics(long currMS) {
+ if (!metricsMap.isEmpty() && currMS - lastPublishTimeMS > publishIntervalMS) {
+ try {
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(new ArrayList<TimelineMetric>(metricsMap.values()));
+ amsClient.emitMetrics(timelineMetrics);
+
+ LOG.info("Published " + timelineMetrics.getMetrics().size() + " metrics to AMS");
+ metricsMap.clear();
+ lastPublishTimeMS = currMS;
+ } catch (Throwable t) {
+ LOG.warn("Error sending metrics to AMS.", t);
+ if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
+ LOG.error("AMS was not sent for last " + maxMetricsBuffer / 1000 +
+ " seconds. Purging it and will start rebuilding it again");
+ metricsMap.clear();
+ lastFailedPublishTimeMS = currMS;
+ }
+ }
+ } else {
+ LOG.info("Not publishing metrics. metrics.size()=" + metricsMap.size() + ", lastPublished=" +
+ (currMS - lastPublishTimeMS) / 1000 + " seconds ago, intervalConfigured=" + publishIntervalMS / 1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
deleted file mode 100644
index eff9d0d..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/metrics/MetricsMgr.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.metrics;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.TreeMap;
-
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.log4j.Logger;
-
-public class MetricsMgr {
- private static final Logger logger = Logger.getLogger(MetricsMgr.class);
-
- private boolean isMetricsEnabled = false;
- private String nodeHostName = null;
- private String appId = "logfeeder";
-
- private long lastPublishTimeMS = 0; // Let's do the first publish immediately
- private long lastFailedPublishTimeMS = System.currentTimeMillis(); // Reset the clock
-
- private int publishIntervalMS = 60 * 1000;
- private int maxMetricsBuffer = 60 * 60 * 1000; // If AMS is down, we should not keep
- // the metrics in memory forever
- private HashMap<String, TimelineMetric> metricsMap = new HashMap<String, TimelineMetric>();
- private LogFeederAMSClient amsClient = null;
-
- public void init() {
- logger.info("Initializing MetricsMgr()");
- amsClient = new LogFeederAMSClient();
-
- if (amsClient.getCollectorUri() != null) {
- nodeHostName = LogFeederUtil.getStringProperty("node.hostname");
- if (nodeHostName == null) {
- try {
- nodeHostName = InetAddress.getLocalHost().getHostName();
- } catch (Throwable e) {
- logger.warn(
- "Error getting hostname using InetAddress.getLocalHost().getHostName()",
- e);
- }
- if (nodeHostName == null) {
- try {
- nodeHostName = InetAddress.getLocalHost()
- .getCanonicalHostName();
- } catch (Throwable e) {
- logger.warn(
- "Error getting hostname using InetAddress.getLocalHost().getCanonicalHostName()",
- e);
- }
- }
- }
- if (nodeHostName == null) {
- isMetricsEnabled = false;
- logger.error("Failed getting hostname for node. Disabling publishing LogFeeder metrics");
- } else {
- isMetricsEnabled = true;
- logger.info("LogFeeder Metrics is enabled. Metrics host="
- + amsClient.getCollectorUri());
- }
- } else {
- logger.info("LogFeeder Metrics publish is disabled");
- }
- }
-
- public boolean isMetricsEnabled() {
- return isMetricsEnabled;
- }
-
- synchronized public void useMetrics(List<MetricCount> metricsList) {
- if (!isMetricsEnabled) {
- return;
- }
- logger.info("useMetrics() metrics.size=" + metricsList.size());
- long currMS = System.currentTimeMillis();
- Long currMSLong = new Long(currMS);
- for (MetricCount metric : metricsList) {
- if (metric.metricsName == null) {
- logger.debug("metric.metricsName is null");
- // Metrics is not meant to be published
- continue;
- }
- long currCount = metric.count;
- if (!metric.isPointInTime && metric.publishCount > 0
- && currCount <= metric.prevPublishCount) {
- // No new data added, so let's ignore it
- logger.debug("Nothing changed. " + metric.metricsName
- + ", currCount=" + currCount + ", prevPublishCount="
- + metric.prevPublishCount);
- continue;
- }
- metric.publishCount++;
-
- TimelineMetric timelineMetric = metricsMap.get(metric.metricsName);
- if (timelineMetric == null) {
- logger.debug("Creating new metric obbject for "
- + metric.metricsName);
- // First time for this metric
- timelineMetric = new TimelineMetric();
- timelineMetric.setMetricName(metric.metricsName);
- timelineMetric.setHostName(nodeHostName);
- timelineMetric.setAppId(appId);
- timelineMetric.setStartTime(currMS);
- timelineMetric.setType("Long");
- timelineMetric.setMetricValues(new TreeMap<Long, Double>());
-
- metricsMap.put(metric.metricsName, timelineMetric);
- }
- logger.debug("Adding metrics=" + metric.metricsName);
- if (metric.isPointInTime) {
- timelineMetric.getMetricValues().put(currMSLong,
- new Double(currCount));
- } else {
- Double value = timelineMetric.getMetricValues().get(currMSLong);
- if (value == null) {
- value = new Double(0);
- }
- value += (currCount - metric.prevPublishCount);
- timelineMetric.getMetricValues().put(currMSLong, value);
- metric.prevPublishCount = currCount;
- }
- }
-
- if (metricsMap.size() > 0
- && currMS - lastPublishTimeMS > publishIntervalMS) {
- try {
- // Time to publish
- TimelineMetrics timelineMetrics = new TimelineMetrics();
- List<TimelineMetric> timeLineMetricList = new ArrayList<TimelineMetric>();
- timeLineMetricList.addAll(metricsMap.values());
- timelineMetrics.setMetrics(timeLineMetricList);
- amsClient.emitMetrics(timelineMetrics);
- logger.info("Published " + timeLineMetricList.size()
- + " metrics to AMS");
- metricsMap.clear();
- timeLineMetricList.clear();
- lastPublishTimeMS = currMS;
- } catch (Throwable t) {
- logger.warn("Error sending metrics to AMS.", t);
- if (currMS - lastFailedPublishTimeMS > maxMetricsBuffer) {
- logger.error("AMS was not sent for last "
- + maxMetricsBuffer
- / 1000
- + " seconds. Purging it and will start rebuilding it again");
- metricsMap.clear();
- lastFailedPublishTimeMS = currMS;
- }
- }
- } else {
- logger.info("Not publishing metrics. metrics.size()="
- + metricsMap.size() + ", lastPublished="
- + (currMS - lastPublishTimeMS) / 1000
- + " seconds ago, intervalConfigured=" + publishIntervalMS
- / 1000);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
index 6f84251..bc6a553 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/Output.java
@@ -26,16 +26,19 @@ import java.util.Map.Entry;
import org.apache.ambari.logfeeder.common.ConfigBlock;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
+import org.apache.ambari.logfeeder.metrics.MetricData;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.log4j.Logger;
public abstract class Output extends ConfigBlock {
- private static final Logger logger = Logger.getLogger(Output.class);
+ private static final Logger LOG = Logger.getLogger(Output.class);
private String destination = null;
- protected MetricCount writeBytesMetric = new MetricCount();
+ protected MetricData writeBytesMetric = new MetricData(getWriteBytesMetricName(), false);
+ protected String getWriteBytesMetricName() {
+ return null;
+ }
@Override
public String getShortDescription() {
@@ -67,7 +70,7 @@ public abstract class Output extends ConfigBlock {
* Extend this method to clean up
*/
public void close() {
- logger.info("Calling base close()." + getShortDescription());
+ LOG.info("Calling base close()." + getShortDescription());
isClosed = true;
}
@@ -91,7 +94,7 @@ public abstract class Output extends ConfigBlock {
}
@Override
- public void addMetricsContainers(List<MetricCount> metricsList) {
+ public void addMetricsContainers(List<MetricData> metricsList) {
super.addMetricsContainers(metricsList);
metricsList.add(writeBytesMetric);
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
index 4a408f9..c46086e 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputData.java
@@ -27,18 +27,16 @@ import org.apache.ambari.logfeeder.input.InputMarker;
* This contains the output json object and InputMarker.
*/
public class OutputData {
- Map<String, Object> jsonObj;
- InputMarker inputMarker;
+ public final Map<String, Object> jsonObj;
+ public final InputMarker inputMarker;
public OutputData(Map<String, Object> jsonObj, InputMarker inputMarker) {
- super();
this.jsonObj = jsonObj;
this.inputMarker = inputMarker;
}
@Override
public String toString() {
- return "OutputData [jsonObj=" + jsonObj + ", inputMarker="
- + inputMarker + "]";
+ return "OutputData [jsonObj=" + jsonObj + ", inputMarker=" + inputMarker + "]";
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/0a3cdccd/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
index 2d41a0b..fa4e17b 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputDevNull.java
@@ -28,16 +28,15 @@ import org.apache.log4j.Logger;
*/
public class OutputDevNull extends Output {
- private static Logger logger = Logger.getLogger(OutputDevNull.class);
+ private static final Logger LOG = Logger.getLogger(OutputDevNull.class);
@Override
public void write(String block, InputMarker inputMarker){
- logger.trace("Ignore log block: " + block);
+ LOG.trace("Ignore log block: " + block);
}
@Override
public void copyFile(File inputFile, InputMarker inputMarker) {
- throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=dev_null");
+ throw new UnsupportedOperationException("copyFile method is not yet supported for output=dev_null");
}
}