You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/09/07 23:38:37 UTC
[47/50] [abbrv] ambari git commit: AMBARI-18246. Clean up Log Feeder
(Miklos Gergely via oleewere)
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
index 18a5a54..e1a0bb9 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputFile.java
@@ -30,26 +30,27 @@ import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
public class OutputFile extends Output {
- static Logger logger = Logger.getLogger(OutputFile.class);
+ private static final Logger LOG = Logger.getLogger(OutputFile.class);
- PrintWriter outWriter = null;
- String filePath = null;
- String codec;
+ private PrintWriter outWriter;
+ private String filePath = null;
+ private String codec;
@Override
public void init() throws Exception {
super.init();
filePath = getStringValue("path");
- if (filePath == null || filePath.isEmpty()) {
- logger.error("Filepath config property <path> is not set in config file.");
+ if (StringUtils.isEmpty(filePath)) {
+ LOG.error("Filepath config property <path> is not set in config file.");
return;
}
codec = getStringValue("codec");
- if (codec == null || codec.trim().isEmpty()) {
+ if (StringUtils.isBlank(codec)) {
codec = "json";
} else {
if (codec.trim().equalsIgnoreCase("csv")) {
@@ -57,12 +58,11 @@ public class OutputFile extends Output {
} else if (codec.trim().equalsIgnoreCase("json")) {
codec = "csv";
} else {
- logger.error("Unsupported codec type. codec=" + codec
- + ", will use json");
+ LOG.error("Unsupported codec type. codec=" + codec + ", will use json");
codec = "json";
}
}
- logger.info("Out filePath=" + filePath + ", codec=" + codec);
+ LOG.info("Out filePath=" + filePath + ", codec=" + codec);
File outFile = new File(filePath);
if (outFile.getParentFile() != null) {
File parentDir = outFile.getParentFile();
@@ -71,16 +71,14 @@ public class OutputFile extends Output {
}
}
- outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile,
- true)));
+ outWriter = new PrintWriter(new BufferedWriter(new FileWriter(outFile, true)));
- logger.info("init() is successfull. filePath="
- + outFile.getAbsolutePath());
+ LOG.info("init() is successfull. filePath=" + outFile.getAbsolutePath());
}
@Override
public void close() {
- logger.info("Closing file." + getShortDescription());
+ LOG.info("Closing file." + getShortDescription());
if (outWriter != null) {
try {
outWriter.close();
@@ -92,8 +90,7 @@ public class OutputFile extends Output {
}
@Override
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
- throws Exception {
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
String outStr = null;
CSVPrinter csvPrinter = null;
try {
@@ -104,7 +101,7 @@ public class OutputFile extends Output {
outStr = LogFeederUtil.getGson().toJson(jsonObj);
}
if (outWriter != null && outStr != null) {
- statMetric.count++;
+ statMetric.value++;
outWriter.println(outStr);
outWriter.flush();
@@ -122,7 +119,7 @@ public class OutputFile extends Output {
@Override
synchronized public void write(String block, InputMarker inputMarker) throws Exception {
if (outWriter != null && block != null) {
- statMetric.count++;
+ statMetric.value++;
outWriter.println(block);
outWriter.flush();
@@ -135,10 +132,7 @@ public class OutputFile extends Output {
}
@Override
- public void copyFile(File inputFile, InputMarker inputMarker)
- throws UnsupportedOperationException {
- throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=file");
+ public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("copyFile method is not yet supported for output=file");
}
-
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
index a360215..8f4b0b1 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputHDFSFile.java
@@ -43,7 +43,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* The events are spooled on the local file system and uploaded in batches asynchronously.
*/
public class OutputHDFSFile extends Output implements RolloverHandler, RolloverCondition {
- private final static Logger logger = Logger.getLogger(OutputHDFSFile.class);
+ private static final Logger LOG = Logger.getLogger(OutputHDFSFile.class);
+
private static final long DEFAULT_ROLLOVER_THRESHOLD_TIME_SECONDS = 5 * 60L;// 5 min by default
private ConcurrentLinkedQueue<File> localReadyFiles = new ConcurrentLinkedQueue<File>();
@@ -72,23 +73,20 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
rolloverThresholdTimeMillis = rolloverThresholdTimeSeconds * 1000L;
filenamePrefix = getStringValue("file_name_prefix", filenamePrefix);
if (StringUtils.isEmpty(hdfsOutDir)) {
- logger
- .error("HDFS config property <hdfs_out_dir> is not set in config file.");
+ LOG.error("HDFS config property <hdfs_out_dir> is not set in config file.");
return;
}
if (StringUtils.isEmpty(hdfsHost)) {
- logger
- .error("HDFS config property <hdfs_host> is not set in config file.");
+ LOG.error("HDFS config property <hdfs_host> is not set in config file.");
return;
}
if (StringUtils.isEmpty(hdfsPort)) {
- logger
- .error("HDFS config property <hdfs_port> is not set in config file.");
+ LOG.error("HDFS config property <hdfs_port> is not set in config file.");
return;
}
HashMap<String, String> contextParam = buildContextParam();
hdfsOutDir = PlaceholderUtil.replaceVariables(hdfsOutDir, contextParam);
- logger.info("hdfs Output dir=" + hdfsOutDir);
+ LOG.info("hdfs Output dir=" + hdfsOutDir);
String localFileDir = LogFeederUtil.getLogfeederTempDir() + "hdfs/service/";
logSpooler = new LogSpooler(localFileDir, filenamePrefix, this, this);
this.startHDFSCopyThread();
@@ -96,18 +94,17 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
@Override
public void close() {
- logger.info("Closing file." + getShortDescription());
+ LOG.info("Closing file." + getShortDescription());
logSpooler.rollover();
this.stopHDFSCopyThread();
isClosed = true;
}
@Override
- synchronized public void write(String block, InputMarker inputMarker)
- throws Exception {
+ public synchronized void write(String block, InputMarker inputMarker) throws Exception {
if (block != null) {
logSpooler.add(block);
- statMetric.count++;
+ statMetric.value++;
}
}
@@ -127,24 +124,19 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
Iterator<File> localFileIterator = localReadyFiles.iterator();
while (localFileIterator.hasNext()) {
File localFile = localFileIterator.next();
- fileSystem = LogfeederHDFSUtil.INSTANCE.buildFileSystem(hdfsHost,
- hdfsPort);
+ fileSystem = LogfeederHDFSUtil.buildFileSystem(hdfsHost, hdfsPort);
if (fileSystem != null && localFile.exists()) {
String destFilePath = hdfsOutDir + "/" + localFile.getName();
String localPath = localFile.getAbsolutePath();
boolean overWrite = true;
boolean delSrc = true;
- boolean isCopied = LogfeederHDFSUtil.INSTANCE.copyFromLocal(
- localFile.getAbsolutePath(), destFilePath, fileSystem,
+ boolean isCopied = LogfeederHDFSUtil.copyFromLocal(localFile.getAbsolutePath(), destFilePath, fileSystem,
overWrite, delSrc);
if (isCopied) {
- logger.debug("File copy to hdfs hdfspath :" + destFilePath
- + " and deleted local file :" + localPath);
+ LOG.debug("File copy to hdfs hdfspath :" + destFilePath + " and deleted local file :" + localPath);
} else {
- // TODO Need to write retry logic, in next release we can
- // handle it
- logger.error("Hdfs file copy failed for hdfspath :"
- + destFilePath + " and localpath :" + localPath);
+ // TODO Need to write retry logic, in next release we can handle it
+ LOG.error("Hdfs file copy failed for hdfspath :" + destFilePath + " and localpath :" + localPath);
}
}
localFileIterator.remove();
@@ -157,14 +149,11 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
}
}
} catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage(),e);
+ LOG.error(e.getLocalizedMessage(),e);
}
}
} catch (Exception e) {
- logger
- .error(
- "Exception in hdfsCopyThread errorMsg:"
- + e.getLocalizedMessage(), e);
+ LOG.error("Exception in hdfsCopyThread errorMsg:" + e.getLocalizedMessage(), e);
}
}
};
@@ -174,24 +163,23 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
private void stopHDFSCopyThread() {
if (hdfsCopyThread != null) {
- logger.info("waiting till copy all local files to hdfs.......");
+ LOG.info("waiting till copy all local files to hdfs.......");
while (!localReadyFiles.isEmpty()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- logger.error(e.getLocalizedMessage(), e);
+ LOG.error(e.getLocalizedMessage(), e);
}
- logger.debug("still waiting to copy all local files to hdfs.......");
+ LOG.debug("still waiting to copy all local files to hdfs.......");
}
- logger.info("calling interrupt method for hdfsCopyThread to stop it.");
+ LOG.info("calling interrupt method for hdfsCopyThread to stop it.");
try {
hdfsCopyThread.interrupt();
} catch (SecurityException exception) {
- logger.error(" Current thread : '" + Thread.currentThread().getName()
- + "' does not have permission to interrupt the Thread: '"
- + hdfsCopyThread.getName() + "'");
+ LOG.error(" Current thread : '" + Thread.currentThread().getName() +
+ "' does not have permission to interrupt the Thread: '" + hdfsCopyThread.getName() + "'");
}
- LogfeederHDFSUtil.INSTANCE.closeFileSystem(fileSystem);
+ LogfeederHDFSUtil.closeFileSystem(fileSystem);
}
}
@@ -208,15 +196,13 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
readyMonitor.notifyAll();
}
} catch (Exception e) {
- logger.error(e.getLocalizedMessage(),e);
+ LOG.error(e.getLocalizedMessage(),e);
}
}
@Override
- public void copyFile(File inputFile, InputMarker inputMarker)
- throws UnsupportedOperationException {
- throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=hdfs");
+ public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("copyFile method is not yet supported for output=hdfs");
}
/**
@@ -242,8 +228,8 @@ public class OutputHDFSFile extends Output implements RolloverHandler, RolloverC
long timeSinceCreation = new Date().getTime() - currentSpoolerContext.getActiveLogCreationTime().getTime();
boolean shouldRollover = timeSinceCreation > rolloverThresholdTimeMillis;
if (shouldRollover) {
- logger.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
- " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
+ LOG.info("Detecting that time since file creation time " + currentSpoolerContext.getActiveLogCreationTime() +
+ " has crossed threshold (msecs) " + rolloverThresholdTimeMillis);
}
return shouldRollover;
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index 2595d87..52fc6f8 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -56,6 +56,16 @@ public class OutputKafka extends Output {
private boolean isKafkaBrokerUp = false;
@Override
+ protected String getStatMetricName() {
+ return "output.kafka.write_logs";
+ }
+
+ @Override
+ protected String getWriteBytesMetricName() {
+ return "output.kafka.write_bytes";
+ }
+
+ @Override
public void init() throws Exception {
super.init();
Properties props = initProperties();
@@ -65,9 +75,6 @@ public class OutputKafka extends Output {
}
private Properties initProperties() throws Exception {
- statMetric.metricsName = "output.kafka.write_logs";
- writeBytesMetric.metricsName = "output.kafka.write_bytes";
-
String brokerList = getStringValue("broker_list");
if (StringUtils.isEmpty(brokerList)) {
throw new Exception("For kafka output, bootstrap broker_list is needed");
@@ -124,17 +131,15 @@ public class OutputKafka extends Output {
if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
kafkaCallBack = null;
} else {
- LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for "
- + FAILED_RETRY_INTERVAL + " seconds");
+ LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for " +
+ FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
}
} catch (Throwable t) {
String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey,
- "Error sending message to Kafka during retry. message="
- + (kafkaCallBack == null ? null : kafkaCallBack.message),
- t, LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending message to Kafka during retry. message=" +
+ (kafkaCallBack == null ? null : kafkaCallBack.message), t, LOG, Level.ERROR);
}
}
@@ -160,8 +165,8 @@ public class OutputKafka extends Output {
LOG.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
} else {
- LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages="
- + failedMessages.size() + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
+ LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages=" + failedMessages.size() +
+ " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
}
} catch (Throwable t) {
@@ -198,16 +203,15 @@ public class OutputKafka extends Output {
private boolean publishMessage(String block, InputMarker inputMarker) {
if (isAsync && isKafkaBrokerUp) { // Send asynchronously
- producer.send(new ProducerRecord<String, String>(topic, block),
- new KafkaCallBack(this, block, inputMarker, ++messageCount));
+ producer.send(new ProducerRecord<String, String>(topic, block), new KafkaCallBack(this, block, inputMarker, ++messageCount));
return true;
} else { // Send synchronously
try {
// Not using key. Let it round robin
RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get();
if (metadata != null) {
- statMetric.count++;
- writeBytesMetric.count += block.length();
+ statMetric.value++;
+ writeBytesMetric.value += block.length();
}
if (!isKafkaBrokerUp) {
LOG.info("Started writing to kafka. " + getShortDescription());
@@ -217,18 +221,18 @@ public class OutputKafka extends Output {
} catch (InterruptedException e) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT";
- LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e,
- LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e, LOG,
+ Level.ERROR);
} catch (ExecutionException e) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION";
- LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e,
- LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e, LOG,
+ Level.ERROR);
} catch (Throwable t) {
isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR";
- LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t,
- LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t, LOG,
+ Level.ERROR);
}
}
return false;
@@ -260,12 +264,12 @@ public class OutputKafka extends Output {
output.isKafkaBrokerUp = true;
}
output.incrementStat(1);
- output.writeBytesMetric.count += message.length();
+ output.writeBytesMetric.value += message.length();
} else {
output.isKafkaBrokerUp = false;
String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
- LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback",
- exception, LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback", exception, LOG,
+ Level.ERROR);
output.failedMessages.add(this);
}
@@ -273,9 +277,7 @@ public class OutputKafka extends Output {
}
@Override
- public void copyFile(File inputFile, InputMarker inputMarker)
- throws UnsupportedOperationException {
- throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=kafka");
+ public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("copyFile method is not yet supported for output=kafka");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
new file mode 100644
index 0000000..2c81c19
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputManager.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.logconfig.FilterLogData;
+import org.apache.ambari.logfeeder.metrics.MetricData;
+import org.apache.ambari.logfeeder.util.LogFeederUtil;
+import org.apache.ambari.logfeeder.util.MurmurHash;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+public class OutputManager {
+ private static final Logger LOG = Logger.getLogger(OutputManager.class);
+
+ private static final int HASH_SEED = 31174077;
+ private static final int MAX_OUTPUT_SIZE = 32765; // 32766-1
+
+ private List<Output> outputs = new ArrayList<Output>();
+
+ private boolean addMessageMD5 = true;
+
+ private static long docCounter = 0;
+ private MetricData messageTruncateMetric = new MetricData(null, false);
+
+ public List<Output> getOutputs() {
+ return outputs;
+ }
+
+ public void add(Output output) {
+ this.outputs.add(output);
+ }
+
+ public void retainUsedOutputs(Collection<Output> usedOutputs) {
+ outputs.retainAll(usedOutputs);
+ }
+
+ public void init() throws Exception {
+ for (Output output : outputs) {
+ output.init();
+ }
+ }
+
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+
+ // Update the block with the context fields
+ for (Map.Entry<String, String> entry : input.getContextFields().entrySet()) {
+ if (jsonObj.get(entry.getKey()) == null) {
+ jsonObj.put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ // TODO: Ideally most of the overrides should be configurable
+
+ if (jsonObj.get("type") == null) {
+ jsonObj.put("type", input.getStringValue("type"));
+ }
+ if (jsonObj.get("path") == null && input.getFilePath() != null) {
+ jsonObj.put("path", input.getFilePath());
+ }
+ if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
+ jsonObj.put("path", input.getStringValue("path"));
+ }
+ if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
+ jsonObj.put("host", LogFeederUtil.hostName);
+ }
+ if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
+ jsonObj.put("ip", LogFeederUtil.ipAddress);
+ }
+ if (jsonObj.get("level") == null) {
+ jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
+ }
+
+ if (input.isUseEventMD5() || input.isGenEventMD5()) {
+ String prefix = "";
+ Object logtimeObj = jsonObj.get("logtime");
+ if (logtimeObj != null) {
+ if (logtimeObj instanceof Date) {
+ prefix = "" + ((Date) logtimeObj).getTime();
+ } else {
+ prefix = logtimeObj.toString();
+ }
+ }
+
+ Long eventMD5 = MurmurHash.hash64A(LogFeederUtil.getGson().toJson(jsonObj).getBytes(), HASH_SEED);
+ if (input.isGenEventMD5()) {
+ jsonObj.put("event_md5", prefix + eventMD5.toString());
+ }
+ if (input.isUseEventMD5()) {
+ jsonObj.put("id", prefix + eventMD5.toString());
+ }
+ }
+
+ jsonObj.put("seq_num", new Long(docCounter++));
+ if (jsonObj.get("id") == null) {
+ jsonObj.put("id", UUID.randomUUID().toString());
+ }
+ if (jsonObj.get("event_count") == null) {
+ jsonObj.put("event_count", new Integer(1));
+ }
+ if (inputMarker.lineNumber > 0) {
+ jsonObj.put("logfile_line_number", new Integer(inputMarker.lineNumber));
+ }
+ if (jsonObj.containsKey("log_message")) {
+ // TODO: Let's check size only for log_message for now
+ String logMessage = (String) jsonObj.get("log_message");
+ logMessage = truncateLongLogMessage(jsonObj, input, logMessage);
+ if (addMessageMD5) {
+ jsonObj.put("message_md5", "" + MurmurHash.hash64A(logMessage.getBytes(), 31174077));
+ }
+ }
+
+ if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
+ for (Output output : input.getOutputList()) {
+ try {
+ output.write(jsonObj, inputMarker);
+ } catch (Exception e) {
+ LOG.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private String truncateLongLogMessage(Map<String, Object> jsonObj, Input input, String logMessage) {
+ if (logMessage != null && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
+ messageTruncateMetric.value++;
+ String logMessageKey = this.getClass().getSimpleName() + "_MESSAGESIZE";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Message is too big. size=" + logMessage.getBytes().length +
+ ", input=" + input.getShortDescription() + ". Truncating to " + MAX_OUTPUT_SIZE + ", first upto 100 characters=" +
+ StringUtils.abbreviate(logMessage, 100), null, LOG, Level.WARN);
+ logMessage = new String(logMessage.getBytes(), 0, MAX_OUTPUT_SIZE);
+ jsonObj.put("log_message", logMessage);
+ List<String> tagsList = (List<String>) jsonObj.get("tags");
+ if (tagsList == null) {
+ tagsList = new ArrayList<String>();
+ jsonObj.put("tags", tagsList);
+ }
+ tagsList.add("error_message_truncated");
+ }
+ return logMessage;
+ }
+
+ public void write(String jsonBlock, InputMarker inputMarker) {
+ if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
+ for (Output output : inputMarker.input.getOutputList()) {
+ try {
+ output.write(jsonBlock, inputMarker);
+ } catch (Exception e) {
+ LOG.error("Error writing. to " + output.getShortDescription(), e);
+ }
+ }
+ }
+ }
+
+ public void copyFile(File inputFile, InputMarker inputMarker) {
+ Input input = inputMarker.input;
+ for (Output output : input.getOutputList()) {
+ try {
+ output.copyFile(inputFile, inputMarker);
+ }catch (Exception e) {
+ LOG.error("Error coyping file . to " + output.getShortDescription(), e);
+ }
+ }
+ }
+
+ public void logStats() {
+ for (Output output : outputs) {
+ output.logStat();
+ }
+ LogFeederUtil.logStatForMetric(messageTruncateMetric, "Stat: Messages Truncated", "");
+ }
+
+ public void addMetricsContainers(List<MetricData> metricsList) {
+ metricsList.add(messageTruncateMetric);
+ for (Output output : outputs) {
+ output.addMetricsContainers(metricsList);
+ }
+ }
+
+ public void close() {
+ LOG.info("Close called for outputs ...");
+ for (Output output : outputs) {
+ try {
+ output.setDrain(true);
+ output.close();
+ } catch (Exception e) {
+ // Ignore
+ }
+ }
+
+ // Need to get this value from property
+ int iterations = 30;
+ int waitTimeMS = 1000;
+ for (int i = 0; i < iterations; i++) {
+ boolean allClosed = true;
+ for (Output output : outputs) {
+ if (!output.isClosed()) {
+ try {
+ allClosed = false;
+ LOG.warn("Waiting for output to close. " + output.getShortDescription() + ", " + (iterations - i) + " more seconds");
+ Thread.sleep(waitTimeMS);
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+ if (allClosed) {
+ LOG.info("All outputs are closed. Iterations=" + i);
+ return;
+ }
+ }
+
+ LOG.warn("Some outpus were not closed after " + iterations + " iterations");
+ for (Output output : outputs) {
+ if (!output.isClosed()) {
+ LOG.warn("Output not closed. Will ignore it." + output.getShortDescription() + ", pendingCound=" + output.getPendingCount());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
deleted file mode 100644
index 0a6b7fa..0000000
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputMgr.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ambari.logfeeder.output;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import org.apache.ambari.logfeeder.input.Input;
-import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.LogFeederConstants;
-import org.apache.ambari.logfeeder.logconfig.filter.FilterLogData;
-import org.apache.ambari.logfeeder.metrics.MetricCount;
-import org.apache.ambari.logfeeder.util.LogFeederUtil;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-
-public class OutputMgr {
- private static final Logger logger = Logger.getLogger(OutputMgr.class);
-
- private Collection<Output> outputList = new ArrayList<Output>();
-
- private boolean addMessageMD5 = true;
-
- private int MAX_OUTPUT_SIZE = 32765; // 32766-1
- private static long doc_counter = 0;
- private MetricCount messageTruncateMetric = new MetricCount();
-
-
- public Collection<Output> getOutputList() {
- return outputList;
- }
-
- public void setOutputList(Collection<Output> outputList) {
- this.outputList = outputList;
- }
-
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker) {
- Input input = inputMarker.input;
-
- // Update the block with the context fields
- for (Map.Entry<String, String> entry : input.getContextFields()
- .entrySet()) {
- if (jsonObj.get(entry.getKey()) == null) {
- jsonObj.put(entry.getKey(), entry.getValue());
- }
- }
-
- // TODO: Ideally most of the overrides should be configurable
-
- // Add the input type
- if (jsonObj.get("type") == null) {
- jsonObj.put("type", input.getStringValue("type"));
- }
- if (jsonObj.get("path") == null && input.getFilePath() != null) {
- jsonObj.put("path", input.getFilePath());
- }
- if (jsonObj.get("path") == null && input.getStringValue("path") != null) {
- jsonObj.put("path", input.getStringValue("path"));
- }
-
- // Add host if required
- if (jsonObj.get("host") == null && LogFeederUtil.hostName != null) {
- jsonObj.put("host", LogFeederUtil.hostName);
- }
- // Add IP if required
- if (jsonObj.get("ip") == null && LogFeederUtil.ipAddress != null) {
- jsonObj.put("ip", LogFeederUtil.ipAddress);
- }
-
- //Add level
- if (jsonObj.get("level") == null) {
- jsonObj.put("level", LogFeederConstants.LOG_LEVEL_UNKNOWN);
- }
- if (input.isUseEventMD5() || input.isGenEventMD5()) {
- String prefix = "";
- Object logtimeObj = jsonObj.get("logtime");
- if (logtimeObj != null) {
- if (logtimeObj instanceof Date) {
- prefix = "" + ((Date) logtimeObj).getTime();
- } else {
- prefix = logtimeObj.toString();
- }
- }
- Long eventMD5 = LogFeederUtil.genHash(LogFeederUtil.getGson()
- .toJson(jsonObj));
- if (input.isGenEventMD5()) {
- jsonObj.put("event_md5", prefix + eventMD5.toString());
- }
- if (input.isUseEventMD5()) {
- jsonObj.put("id", prefix + eventMD5.toString());
- }
- }
-
- // jsonObj.put("@timestamp", new Date());
- jsonObj.put("seq_num", new Long(doc_counter++));
- if (jsonObj.get("id") == null) {
- jsonObj.put("id", UUID.randomUUID().toString());
- }
- if (jsonObj.get("event_count") == null) {
- jsonObj.put("event_count", new Integer(1));
- }
- if (inputMarker.lineNumber > 0) {
- jsonObj.put("logfile_line_number", new Integer(
- inputMarker.lineNumber));
- }
- if (jsonObj.containsKey("log_message")) {
- // TODO: Let's check size only for log_message for now
- String logMessage = (String) jsonObj.get("log_message");
- if (logMessage != null
- && logMessage.getBytes().length > MAX_OUTPUT_SIZE) {
- messageTruncateMetric.count++;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_MESSAGESIZE";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Message is too big. size="
- + logMessage.getBytes().length + ", input="
- + input.getShortDescription()
- + ". Truncating to " + MAX_OUTPUT_SIZE
- + ", first upto 100 characters="
- + LogFeederUtil.subString(logMessage, 100),
- null, logger, Level.WARN);
- logMessage = new String(logMessage.getBytes(), 0,
- MAX_OUTPUT_SIZE);
- jsonObj.put("log_message", logMessage);
- // Add error tags
- @SuppressWarnings("unchecked")
- List<String> tagsList = (List<String>) jsonObj.get("tags");
- if (tagsList == null) {
- tagsList = new ArrayList<String>();
- jsonObj.put("tags", tagsList);
- }
- tagsList.add("error_message_truncated");
-
- }
- if (addMessageMD5) {
- jsonObj.put("message_md5",
- "" + LogFeederUtil.genHash(logMessage));
- }
- }
- //check log is allowed to send output
- if (FilterLogData.INSTANCE.isAllowed(jsonObj)) {
- for (Output output : input.getOutputList()) {
- try {
- output.write(jsonObj, inputMarker);
- } catch (Exception e) {
- logger.error("Error writing. to " + output.getShortDescription(), e);
- }
- }
- }
- }
-
- public void write(String jsonBlock, InputMarker inputMarker) {
- //check log is allowed to send output
- if (FilterLogData.INSTANCE.isAllowed(jsonBlock)) {
- for (Output output : inputMarker.input.getOutputList()) {
- try {
- output.write(jsonBlock, inputMarker);
- } catch (Exception e) {
- logger.error("Error writing. to " + output.getShortDescription(), e);
- }
- }
- }
- }
-
- public void close() {
- logger.info("Close called for outputs ...");
- for (Output output : outputList) {
- try {
- output.setDrain(true);
- output.close();
- } catch (Exception e) {
- // Ignore
- }
- }
- // Need to get this value from property
- int iterations = 30;
- int waitTimeMS = 1000;
- int i;
- boolean allClosed = true;
- for (i = 0; i < iterations; i++) {
- allClosed = true;
- for (Output output : outputList) {
- if (!output.isClosed()) {
- try {
- allClosed = false;
- logger.warn("Waiting for output to close. "
- + output.getShortDescription() + ", "
- + (iterations - i) + " more seconds");
- Thread.sleep(waitTimeMS);
- } catch (Throwable t) {
- // Ignore
- }
- }
- }
- if (allClosed) {
- break;
- }
- }
-
- if (!allClosed) {
- logger.warn("Some outpus were not closed. Iterations=" + i);
- for (Output output : outputList) {
- if (!output.isClosed()) {
- logger.warn("Output not closed. Will ignore it."
- + output.getShortDescription() + ", pendingCound="
- + output.getPendingCount());
- }
- }
- } else {
- logger.info("All outputs are closed. Iterations=" + i);
- }
- }
-
- public void logStats() {
- for (Output output : outputList) {
- output.logStat();
- }
- LogFeederUtil.logStatForMetric(messageTruncateMetric,
- "Stat: Messages Truncated", null);
- }
-
- public void addMetricsContainers(List<MetricCount> metricsList) {
- metricsList.add(messageTruncateMetric);
- for (Output output : outputList) {
- output.addMetricsContainers(metricsList);
- }
- }
-
-
- public void copyFile(File inputFile, InputMarker inputMarker) {
- Input input = inputMarker.input;
- for (Output output : input.getOutputList()) {
- try {
- output.copyFile(inputFile, inputMarker);
- }catch (Exception e) {
- logger.error("Error coyping file . to " + output.getShortDescription(),
- e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
index e95f8df..26f1ddb 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputS3File.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.apache.ambari.logfeeder.LogFeeder;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.filter.Filter;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.ambari.logfeeder.output.spool.LogSpooler;
@@ -47,10 +48,10 @@ import java.util.Map.Entry;
* </ul>
*/
public class OutputS3File extends Output implements RolloverCondition, RolloverHandler {
+ private static final Logger LOG = Logger.getLogger(OutputS3File.class);
public static final String INPUT_ATTRIBUTE_TYPE = "type";
public static final String GLOBAL_CONFIG_S3_PATH_SUFFIX = "global.config.json";
- static private Logger logger = Logger.getLogger(OutputS3File.class);
private LogSpooler logSpooler;
private S3OutputConfiguration s3OutputConfiguration;
@@ -72,23 +73,21 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
@Override
public void copyFile(File inputFile, InputMarker inputMarker) {
String type = inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE);
- S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration,
- S3Util.INSTANCE, false, type);
- String resolvedPath = s3Uploader.uploadFile(inputFile,
- inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
+ S3Uploader s3Uploader = new S3Uploader(s3OutputConfiguration, false, type);
+ String resolvedPath = s3Uploader.uploadFile(inputFile, inputMarker.input.getStringValue(INPUT_ATTRIBUTE_TYPE));
uploadConfig(inputMarker, type, s3OutputConfiguration, resolvedPath);
}
- private void uploadConfig(InputMarker inputMarker, String type,
- S3OutputConfiguration s3OutputConfiguration, String resolvedPath) {
+ private void uploadConfig(InputMarker inputMarker, String type, S3OutputConfiguration s3OutputConfiguration,
+ String resolvedPath) {
ArrayList<Map<String, Object>> filters = new ArrayList<>();
addFilters(filters, inputMarker.input.getFirstFilter());
Map<String, Object> inputConfig = new HashMap<>();
inputConfig.putAll(inputMarker.input.getConfigs());
- String s3CompletePath = S3Util.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName()
- + S3Util.S3_PATH_SEPARATOR + resolvedPath;
+ String s3CompletePath = LogFeederConstants.S3_PATH_START_WITH + s3OutputConfiguration.getS3BucketName() +
+ LogFeederConstants.S3_PATH_SEPARATOR + resolvedPath;
inputConfig.put("path", s3CompletePath);
ArrayList<Map<String, Object>> inputConfigList = new ArrayList<>();
@@ -117,17 +116,15 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
}
}
- private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix,
- S3OutputConfiguration s3OutputConfiguration) {
+ private void writeConfigToS3(Map<String, Object> configToWrite, String s3KeySuffix, S3OutputConfiguration s3OutputConfiguration) {
Gson gson = new GsonBuilder().setPrettyPrinting().create();
String configJson = gson.toJson(configToWrite);
- String s3ResolvedKey = new S3LogPathResolver().
- getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix, s3OutputConfiguration.getCluster());
+ String s3ResolvedKey = new S3LogPathResolver().getResolvedPath(getStringValue("s3_config_dir"), s3KeySuffix,
+ s3OutputConfiguration.getCluster());
- S3Util.INSTANCE.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(),
- s3ResolvedKey, s3OutputConfiguration.getS3AccessKey(),
- s3OutputConfiguration.getS3SecretKey());
+ S3Util.writeIntoS3File(configJson, s3OutputConfiguration.getS3BucketName(), s3ResolvedKey,
+ s3OutputConfiguration.getS3AccessKey(), s3OutputConfiguration.getS3SecretKey());
}
private String getComponentConfigFileName(String componentName) {
@@ -136,7 +133,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
private Map<String, Object> getGlobalConfig() {
- Map<String, Object> globalConfig = LogFeeder.globalMap;
+ Map<String, Object> globalConfig = LogFeeder.globalConfigs;
if (globalConfig == null) {
globalConfig = new HashMap<>();
}
@@ -173,8 +170,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
globalConfig.put("copy_file", false);
globalConfig.put("process_file", true);
globalConfig.put("tail", false);
- Map<String, Object> addFields = (Map<String, Object>) globalConfig
- .get("add_fields");
+ Map<String, Object> addFields = (Map<String, Object>) globalConfig.get("add_fields");
if (addFields == null) {
addFields = new HashMap<>();
}
@@ -216,7 +212,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
@VisibleForTesting
protected S3Uploader createUploader(String logType) {
- S3Uploader uploader = new S3Uploader(s3OutputConfiguration, S3Util.INSTANCE, true, logType);
+ S3Uploader uploader = new S3Uploader(s3OutputConfiguration, true, logType);
uploader.startUploaderThread();
return uploader;
}
@@ -224,8 +220,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
@VisibleForTesting
protected LogSpooler createSpooler(String filePath) {
String spoolDirectory = LogFeederUtil.getLogfeederTempDir() + "/s3/service";
- logger.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s",
- spoolDirectory, filePath));
+ LOG.info(String.format("Creating spooler with spoolDirectory=%s, filePath=%s", spoolDirectory, filePath));
return new LogSpooler(spoolDirectory, new File(filePath).getName()+"-", this, this,
s3OutputConfiguration.getRolloverTimeThresholdSecs());
}
@@ -244,7 +239,7 @@ public class OutputS3File extends Output implements RolloverCondition, RolloverH
long currentSize = spoolFile.length();
boolean result = (currentSize >= s3OutputConfiguration.getRolloverSizeThresholdBytes());
if (result) {
- logger.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
+ LOG.info(String.format("Rolling over %s, current size %d, threshold size %d", spoolFile, currentSize,
s3OutputConfiguration.getRolloverSizeThresholdBytes()));
}
return result;
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index cd9ce4d..47f139d 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -34,7 +34,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.ambari.logfeeder.input.InputMarker;
-import org.apache.ambari.logfeeder.logconfig.FetchConfigFromSolr;
+import org.apache.ambari.logfeeder.logconfig.LogConfigHandler;
+import org.apache.ambari.logfeeder.util.DateUtil;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
@@ -76,7 +77,17 @@ public class OutputSolr extends Output {
private BlockingQueue<OutputData> outgoingBuffer = null;
private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
-
+
+ @Override
+ protected String getStatMetricName() {
+ return "output.solr.write_logs";
+ }
+
+ @Override
+ protected String getWriteBytesMetricName() {
+ return "output.solr.write_bytes";
+ }
+
@Override
public void init() throws Exception {
super.init();
@@ -87,9 +98,6 @@ public class OutputSolr extends Output {
}
private void initParams() throws Exception {
- statMetric.metricsName = "output.solr.write_logs";
- writeBytesMetric.metricsName = "output.solr.write_bytes";
-
splitMode = getStringValue("splits_interval_mins", "none");
if (!splitMode.equalsIgnoreCase("none")) {
splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
@@ -204,10 +212,8 @@ public class OutputSolr extends Output {
LOG.info("Ping to Solr server is successful for worker=" + count);
} else {
LOG.warn(
- String.format(
- "Ping to Solr server failed. It would check again. worker=%d, "
- + "solrUrl=%s, zkConnectString=%s, collection=%s, response=%s",
- count, solrUrl, zkConnectString, collection, response));
+ String.format("Ping to Solr server failed. It would check again. worker=%d, solrUrl=%s, zkConnectString=%s, " +
+ "collection=%s, response=%s", count, solrUrl, zkConnectString, collection, response));
}
} catch (Throwable t) {
LOG.warn(String.format(
@@ -223,7 +229,7 @@ public class OutputSolr extends Output {
while (true) {
LOG.info("Checking if config is available");
- if (FetchConfigFromSolr.isFilterAvailable()) {
+ if (LogConfigHandler.isFilterAvailable()) {
LOG.info("Config is available");
return;
}
@@ -256,7 +262,7 @@ public class OutputSolr extends Output {
private void useActualDateIfNeeded(Map<String, Object> jsonObj) {
if (skipLogtime) {
- jsonObj.put("logtime", LogFeederUtil.getActualDateStr());
+ jsonObj.put("logtime", DateUtil.getActualDateStr());
}
}
@@ -324,7 +330,7 @@ public class OutputSolr extends Output {
private final SolrClient solrClient;
private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
- private final Map<String, InputMarker> latestInputMarkerList = new HashMap<>();
+ private final Map<String, InputMarker> latestInputMarkers = new HashMap<>();
private long localBufferBytesSize = 0;
@@ -352,17 +358,16 @@ public class OutputSolr extends Output {
}
}
- if (localBuffer.size() > 0 && ((outputData == null && isDrain())
- || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
+ if (localBuffer.size() > 0 && ((outputData == null && isDrain()) ||
+ (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
boolean response = sendToSolr(outputData);
if( isDrain() && !response) {
//Since sending to Solr response failed and it is in draining mode, let's break;
- LOG.warn("In drain mode and sending to Solr failed. So exiting. output="
- + getShortDescription());
+ LOG.warn("In drain mode and sending to Solr failed. So exiting. output=" + getShortDescription());
break;
}
}
- if( localBuffer.size() == 0 ) {
+ if (localBuffer.size() == 0) {
//If localBuffer is empty, then reset the timer
lastDispatchTime = currTimeMS;
}
@@ -403,8 +408,7 @@ public class OutputSolr extends Output {
} catch (IOException | SolrException exception) {
// Transient error, lets block till it is available
try {
- LOG.warn("Solr is not reachable. Going to retry after "
- + RETRY_INTERVAL + " seconds. " + "output="
+ LOG.warn("Solr is not reachable. Going to retry after " + RETRY_INTERVAL + " seconds. " + "output="
+ getShortDescription(), exception);
Thread.sleep(RETRY_INTERVAL * 1000);
} catch (Throwable t) {
@@ -414,8 +418,8 @@ public class OutputSolr extends Output {
// Something unknown happened. Let's not block because of this error.
// Clear the buffer
String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(logMessageKey,
- "Error sending log message to server. Dropping logs", serverException, LOG, Level.ERROR);
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error sending log message to server. Dropping logs",
+ serverException, LOG, Level.ERROR);
resetLocalBuffer();
break;
}
@@ -447,7 +451,7 @@ public class OutputSolr extends Output {
Level.ERROR);
}
}
- latestInputMarkerList.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+ latestInputMarkers.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
localBuffer.add(document);
}
@@ -479,9 +483,9 @@ public class OutputSolr extends Output {
LogFeederUtil.logErrorMessageByInterval(logMessageKey,
String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, LOG, Level.ERROR);
}
- statMetric.count += localBuffer.size();
- writeBytesMetric.count += localBufferBytesSize;
- for (InputMarker inputMarker : latestInputMarkerList.values()) {
+ statMetric.value += localBuffer.size();
+ writeBytesMetric.value += localBufferBytesSize;
+ for (InputMarker inputMarker : latestInputMarkers.values()) {
inputMarker.input.checkIn(inputMarker);
}
}
@@ -499,7 +503,7 @@ public class OutputSolr extends Output {
public void resetLocalBuffer() {
localBuffer.clear();
localBufferBytesSize = 0;
- latestInputMarkerList.clear();
+ latestInputMarkers.clear();
}
public boolean isDone() {
@@ -512,9 +516,7 @@ public class OutputSolr extends Output {
}
@Override
- public void copyFile(File inputFile, InputMarker inputMarker)
- throws UnsupportedOperationException {
- throw new UnsupportedOperationException(
- "copyFile method is not yet supported for output=solr");
+ public void copyFile(File inputFile, InputMarker inputMarker) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("copyFile method is not yet supported for output=solr");
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
index 58282e0..8c544cf 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3LogPathResolver.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,9 +18,9 @@
package org.apache.ambari.logfeeder.output;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.LogFeederUtil;
import org.apache.ambari.logfeeder.util.PlaceholderUtil;
-import org.apache.ambari.logfeeder.util.S3Util;
import java.util.HashMap;
@@ -40,7 +40,7 @@ public class S3LogPathResolver {
public String getResolvedPath(String baseKeyPrefix, String keySuffix, String cluster) {
HashMap<String, String> contextParam = buildContextParam(cluster);
String resolvedKeyPrefix = PlaceholderUtil.replaceVariables(baseKeyPrefix, contextParam);
- return resolvedKeyPrefix + S3Util.S3_PATH_SEPARATOR + keySuffix;
+ return resolvedKeyPrefix + LogFeederConstants.S3_PATH_SEPARATOR + keySuffix;
}
private HashMap<String, String> buildContextParam(String cluster) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
index 485b0d4..e5974c5 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3OutputConfiguration.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -103,8 +103,7 @@ public class S3OutputConfiguration {
};
for (int i = 0; i < longValuedKeysToCopy.length; i++) {
- configs.put(longValuedKeysToCopy[i],
- configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
+ configs.put(longValuedKeysToCopy[i], configBlock.getLongValue(longValuedKeysToCopy[i], defaultValuesForLongValuedKeys[i]));
}
configs.put(ADDITIONAL_FIELDS_KEY, configBlock.getNVList(ADDITIONAL_FIELDS_KEY));
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
index fd59c51..e95a663 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/S3Uploader.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,8 +18,12 @@
package org.apache.ambari.logfeeder.output;
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.s3.transfer.TransferManager;
+import com.amazonaws.services.s3.transfer.Upload;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.ambari.logfeeder.common.LogFeederConstants;
import org.apache.ambari.logfeeder.util.CompressionUtil;
import org.apache.ambari.logfeeder.util.S3Util;
import org.apache.log4j.Logger;
@@ -39,20 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
* {@link org.apache.ambari.logfeeder.input.InputFile}.
*/
public class S3Uploader implements Runnable {
+ private static final Logger LOG = Logger.getLogger(S3Uploader.class);
+
public static final String POISON_PILL = "POISON-PILL";
- private static Logger logger = Logger.getLogger(S3Uploader.class);
private final S3OutputConfiguration s3OutputConfiguration;
- private final S3Util s3UtilInstance;
private final boolean deleteOnEnd;
- private String logType;
+ private final String logType;
private final BlockingQueue<String> fileContextsToUpload;
- private AtomicBoolean stopRunningThread = new AtomicBoolean(false);
+ private final AtomicBoolean stopRunningThread = new AtomicBoolean(false);
- public S3Uploader(S3OutputConfiguration s3OutputConfiguration, S3Util s3UtilInstance, boolean deleteOnEnd,
- String logType) {
+ public S3Uploader(S3OutputConfiguration s3OutputConfiguration, boolean deleteOnEnd, String logType) {
this.s3OutputConfiguration = s3OutputConfiguration;
- this.s3UtilInstance = s3UtilInstance;
this.deleteOnEnd = deleteOnEnd;
this.logType = logType;
this.fileContextsToUpload = new LinkedBlockingQueue<>();
@@ -81,7 +83,7 @@ public class S3Uploader implements Runnable {
stopRunningThread.set(true);
boolean offerStatus = fileContextsToUpload.offer(POISON_PILL);
if (!offerStatus) {
- logger.warn("Could not add poison pill to interrupt uploader thread.");
+ LOG.warn("Could not add poison pill to interrupt uploader thread.");
}
}
@@ -92,7 +94,7 @@ public class S3Uploader implements Runnable {
void addFileForUpload(String fileToUpload) {
boolean offerStatus = fileContextsToUpload.offer(fileToUpload);
if (!offerStatus) {
- logger.error("Could not add file " + fileToUpload + " for upload.");
+ LOG.error("Could not add file " + fileToUpload + " for upload.");
}
}
@@ -102,12 +104,12 @@ public class S3Uploader implements Runnable {
try {
String fileNameToUpload = fileContextsToUpload.take();
if (POISON_PILL.equals(fileNameToUpload)) {
- logger.warn("Found poison pill while waiting for files to upload, exiting");
+ LOG.warn("Found poison pill while waiting for files to upload, exiting");
return;
}
uploadFile(new File(fileNameToUpload), logType);
} catch (InterruptedException e) {
- logger.error("Interrupted while waiting for elements from fileContextsToUpload", e);
+ LOG.error("Interrupted while waiting for elements from fileContextsToUpload", e);
return;
}
}
@@ -130,34 +132,44 @@ public class S3Uploader implements Runnable {
String compressionAlgo = s3OutputConfiguration.getCompressionAlgo();
String keySuffix = fileToUpload.getName() + "." + compressionAlgo;
- String s3Path = new S3LogPathResolver().
- getResolvedPath(s3OutputConfiguration.getS3Path()+S3Util.S3_PATH_SEPARATOR+logType,
- keySuffix, s3OutputConfiguration.getCluster());
- logger.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s",
- s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
+ String s3Path = new S3LogPathResolver().getResolvedPath(
+ s3OutputConfiguration.getS3Path() + LogFeederConstants.S3_PATH_SEPARATOR + logType, keySuffix,
+ s3OutputConfiguration.getCluster());
+ LOG.info(String.format("keyPrefix=%s, keySuffix=%s, s3Path=%s", s3OutputConfiguration.getS3Path(), keySuffix, s3Path));
File sourceFile = createCompressedFileForUpload(fileToUpload, compressionAlgo);
- logger.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
- s3UtilInstance.uploadFileTos3(bucketName, s3Path, sourceFile, s3AccessKey,
- s3SecretKey);
+ LOG.info("Starting S3 upload " + sourceFile + " -> " + bucketName + ", " + s3Path);
+ uploadFileToS3(bucketName, s3Path, sourceFile, s3AccessKey, s3SecretKey);
// delete local compressed file
sourceFile.delete();
if (deleteOnEnd) {
- logger.info("Deleting input file as required");
+ LOG.info("Deleting input file as required");
if (!fileToUpload.delete()) {
- logger.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
+ LOG.error("Could not delete file " + fileToUpload.getAbsolutePath() + " after upload to S3");
}
}
return s3Path;
}
@VisibleForTesting
+ protected void uploadFileToS3(String bucketName, String s3Key, File localFile, String accessKey, String secretKey) {
+ TransferManager transferManager = S3Util.getTransferManager(accessKey, secretKey);
+ try {
+ Upload upload = transferManager.upload(bucketName, s3Key, localFile);
+ upload.waitForUploadResult();
+ } catch (AmazonClientException | InterruptedException e) {
+ LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), e);
+ } finally {
+ S3Util.shutdownTransferManager(transferManager);
+ }
+ }
+
+ @VisibleForTesting
protected File createCompressedFileForUpload(File fileToUpload, String compressionAlgo) {
- File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_"
- + new Date().getTime() + "." + compressionAlgo);
- outputFile = CompressionUtil.compressFile(fileToUpload, outputFile,
- compressionAlgo);
+ File outputFile = new File(fileToUpload.getParent(), fileToUpload.getName() + "_" + new Date().getTime() +
+ "." + compressionAlgo);
+ outputFile = CompressionUtil.compressFile(fileToUpload, outputFile, compressionAlgo);
return outputFile;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
index fb263ba..1f13357 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpooler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -39,8 +39,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
* {@link RolloverHandler} to trigger the handling of the rolled over file.
*/
public class LogSpooler {
+
+ private static final Logger LOG = Logger.getLogger(LogSpooler.class);
public static final long TIME_BASED_ROLLOVER_DISABLED_THRESHOLD = 0;
- static private Logger logger = Logger.getLogger(LogSpooler.class);
static final String fileDateFormat = "yyyy-MM-dd-HH-mm-ss";
private String spoolDirectory;
@@ -98,7 +99,7 @@ public class LogSpooler {
private void initializeSpoolDirectory() {
File spoolDir = new File(spoolDirectory);
if (!spoolDir.exists()) {
- logger.info("Creating spool directory: " + spoolDir);
+ LOG.info("Creating spool directory: " + spoolDir);
boolean result = spoolDir.mkdirs();
if (!result) {
throw new LogSpoolerException("Could not create spool directory: " + spoolDirectory);
@@ -116,7 +117,7 @@ public class LogSpooler {
+ ", error message: " + e.getLocalizedMessage(), e);
}
currentSpoolerContext = new LogSpoolerContext(currentSpoolFile);
- logger.info("Initialized spool file at path: " + currentSpoolFile);
+ LOG.info("Initialized spool file at path: " + currentSpoolFile);
}
@VisibleForTesting
@@ -141,7 +142,7 @@ public class LogSpooler {
currentSpoolBufferedWriter.println(logEvent);
currentSpoolerContext.logEventSpooled();
if (rolloverCondition.shouldRollover(currentSpoolerContext)) {
- logger.info("Trying to rollover based on rollover condition");
+ LOG.info("Trying to rollover based on rollover condition");
tryRollover();
}
}
@@ -154,19 +155,19 @@ public class LogSpooler {
* rolled over file.
*/
public void rollover() {
- logger.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
+ LOG.info("Rollover condition detected, rolling over file: " + currentSpoolFile);
currentSpoolBufferedWriter.flush();
if (currentSpoolFile.length()==0) {
- logger.info("No data in file " + currentSpoolFile + ", not doing rollover");
+ LOG.info("No data in file " + currentSpoolFile + ", not doing rollover");
} else {
currentSpoolBufferedWriter.close();
rolloverHandler.handleRollover(currentSpoolFile);
- logger.info("Invoked rollover handler with file: " + currentSpoolFile);
+ LOG.info("Invoked rollover handler with file: " + currentSpoolFile);
initializeSpoolState();
}
boolean status = rolloverInProgress.compareAndSet(true, false);
if (!status) {
- logger.error("Should have reset rollover flag!!");
+ LOG.error("Should have reset rollover flag!!");
}
}
@@ -174,7 +175,7 @@ public class LogSpooler {
if (rolloverInProgress.compareAndSet(false, true)) {
rollover();
} else {
- logger.warn("Ignoring rollover call as rollover already in progress for file " +
+ LOG.warn("Ignoring rollover call as rollover already in progress for file " +
currentSpoolFile);
}
}
@@ -197,7 +198,7 @@ public class LogSpooler {
private class LogSpoolerRolloverTimerTask extends TimerTask {
@Override
public void run() {
- logger.info("Trying rollover based on time");
+ LOG.info("Trying rollover based on time");
tryRollover();
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
index 084d6a2..616300f 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerContext.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
index 1e12fb7..14bb139 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/LogSpoolerException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
index 8279645..48ace11 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverCondition.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
index 11308e4..2ec2708 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/spool/RolloverHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
index 15f7594..f814a92 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AWSUtil.java
@@ -20,62 +20,20 @@ package org.apache.ambari.logfeeder.util;
import org.apache.log4j.Logger;
-import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
-import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClient;
-public enum AWSUtil {
- INSTANCE;
+public class AWSUtil {
private static final Logger LOG = Logger.getLogger(AWSUtil.class);
- public String getAwsUserName(String accessKey, String secretKey) {
- String username = null;
- AWSCredentials awsCredentials = createAWSCredentials(accessKey, secretKey);
- AmazonIdentityManagementClient amazonIdentityManagementClient;
- if (awsCredentials != null) {
- amazonIdentityManagementClient = new AmazonIdentityManagementClient(
- awsCredentials);
- } else {
- // create default client
- amazonIdentityManagementClient = new AmazonIdentityManagementClient();
- }
- try {
- username = amazonIdentityManagementClient.getUser().getUser()
- .getUserName();
- } catch (AmazonServiceException e) {
- if (e.getErrorCode().compareTo("AccessDenied") == 0) {
- String arn = null;
- String msg = e.getMessage();
- int arnIdx = msg.indexOf("arn:aws");
- if (arnIdx != -1) {
- int arnSpace = msg.indexOf(" ", arnIdx);
- // should be similar to "arn:aws:iam::111111111111:user/username"
- arn = msg.substring(arnIdx, arnSpace);
- }
- if (arn != null) {
- String[] arnParts = arn.split(":");
- if (arnParts != null && arnParts.length > 5) {
- username = arnParts[5];
- if (username != null) {
- username = username.replace("user/", "");
- }
- }
- }
- }
- } catch (Exception exception) {
- LOG.error(
- "Error in getting username :" + exception.getLocalizedMessage(),
- exception.getCause());
- }
- return username;
+ private AWSUtil() {
+ throw new UnsupportedOperationException();
}
- public AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
+ public static AWSCredentials createAWSCredentials(String accessKey, String secretKey) {
if (accessKey != null && secretKey != null) {
LOG.debug("Creating aws client as per new accesskey and secretkey");
- AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey,
- secretKey);
+ AWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return awsCredentials;
} else {
return null;
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
index a92ba29..5049b62 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/AliasUtil.java
@@ -21,69 +21,90 @@ package org.apache.ambari.logfeeder.util;
import java.io.File;
import java.util.HashMap;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.mapper.Mapper;
+import org.apache.ambari.logfeeder.output.Output;
+import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
public class AliasUtil {
- private static Logger logger = Logger.getLogger(AliasUtil.class);
+ private static final Logger LOG = Logger.getLogger(AliasUtil.class);
- private static AliasUtil instance = null;
+ private static final String ALIAS_CONFIG_JSON = "alias_config.json";
+ private static HashMap<String, Object> aliasMap = null;
- private static String aliasConfigJson = "alias_config.json";
-
- private HashMap<String, Object> aliasMap = null;
-
- public static enum ALIAS_TYPE {
- INPUT, FILTER, MAPPER, OUTPUT
+ static {
+ File jsonFile = FileUtil.getFileFromClasspath(ALIAS_CONFIG_JSON);
+ if (jsonFile != null) {
+ aliasMap = FileUtil.readJsonFromFile(jsonFile);
+ }
}
- public static enum ALIAS_PARAM {
- KLASS
+ public static enum AliasType {
+ INPUT, FILTER, MAPPER, OUTPUT
}
private AliasUtil() {
- init();
+ throw new UnsupportedOperationException();
}
- public static AliasUtil getInstance() {
- if (instance == null) {
- synchronized (AliasUtil.class) {
- if (instance == null) {
- instance = new AliasUtil();
- }
- }
+ public static Object getClassInstance(String key, AliasType aliasType) {
+ String classFullName = getClassFullName(key, aliasType);
+
+ Object instance = null;
+ try {
+ instance = (Object) Class.forName(classFullName).getConstructor().newInstance();
+ } catch (Exception exception) {
+ LOG.error("Unsupported class = " + classFullName, exception.getCause());
}
- return instance;
- }
- /**
- */
- private void init() {
- File jsonFile = LogFeederUtil.getFileFromClasspath(aliasConfigJson);
- if (jsonFile != null) {
- this.aliasMap = LogFeederUtil.readJsonFromFile(jsonFile);
+ if (instance != null) {
+ boolean isValid = false;
+ switch (aliasType) {
+ case FILTER:
+ isValid = Filter.class.isAssignableFrom(instance.getClass());
+ break;
+ case INPUT:
+ isValid = Input.class.isAssignableFrom(instance.getClass());
+ break;
+ case OUTPUT:
+ isValid = Output.class.isAssignableFrom(instance.getClass());
+ break;
+ case MAPPER:
+ isValid = Mapper.class.isAssignableFrom(instance.getClass());
+ break;
+ default:
+ LOG.warn("Unhandled aliasType: " + aliasType);
+ isValid = true;
+ }
+ if (!isValid) {
+ LOG.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name());
+ }
}
-
+ return instance;
}
-
- public String readAlias(String key, ALIAS_TYPE aliastype, ALIAS_PARAM aliasParam) {
- String result = key;// key as a default value;
+ private static String getClassFullName(String key, AliasType aliastype) {
+ String className = null;// key as a default value;
+
HashMap<String, String> aliasInfo = getAliasInfo(key, aliastype);
- String value = aliasInfo.get(aliasParam.name().toLowerCase());
- if (value != null && !value.isEmpty()) {
- result = value;
- logger.debug("Alias found for key :" + key + ", param :" + aliasParam.name().toLowerCase() + ", value :"
- + value + " aliastype:" + aliastype.name());
+ String value = aliasInfo.get("klass");
+ if (!StringUtils.isEmpty(value)) {
+ className = value;
+ LOG.debug("Class name found for key :" + key + ", class name :" + className + " aliastype:" + aliastype.name());
} else {
- logger.debug("Alias not found for key :" + key + ", param :" + aliasParam.name().toLowerCase());
+ LOG.debug("Class name not found for key :" + key + " aliastype:" + aliastype.name());
}
- return result;
+
+ return className;
}
@SuppressWarnings("unchecked")
- private HashMap<String, String> getAliasInfo(String key, ALIAS_TYPE aliastype) {
- HashMap<String, String> aliasInfo = null;
+ private static HashMap<String, String> getAliasInfo(String key, AliasType aliastype) {
+ HashMap<String, String> aliasInfo = new HashMap<String, String>();
+
if (aliasMap != null) {
String typeKey = aliastype.name().toLowerCase();
HashMap<String, Object> typeJson = (HashMap<String, Object>) aliasMap.get(typeKey);
@@ -91,9 +112,7 @@ public class AliasUtil {
aliasInfo = (HashMap<String, String>) typeJson.get(key);
}
}
- if (aliasInfo == null) {
- aliasInfo = new HashMap<String, String>();
- }
+
return aliasInfo;
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
index c2addbd..c460ab3 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/CompressionUtil.java
@@ -37,25 +37,20 @@ public class CompressionUtil {
FileInputStream ios = null;
try {
if (!inputFile.exists()) {
- throw new IllegalArgumentException("Input File:"
- + inputFile.getAbsolutePath() + " is not exist.");
+ throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is not exist.");
}
if (inputFile.isDirectory()) {
- throw new IllegalArgumentException("Input File:"
- + inputFile.getAbsolutePath() + " is a directory.");
+ throw new IllegalArgumentException("Input File:" + inputFile.getAbsolutePath() + " is a directory.");
}
File parent = outputFile.getParentFile();
if (parent != null && !parent.exists()) {
boolean isParentCreated = parent.mkdirs();
if (!isParentCreated) {
- throw new IllegalAccessException(
- "User does not have permission to create parent directory :"
- + parent.getAbsolutePath());
+ throw new IllegalAccessException( "User does not have permission to create parent directory :" + parent.getAbsolutePath());
}
}
- final OutputStream out = new FileOutputStream(outputFile);
- cos = new CompressorStreamFactory().createCompressorOutputStream(
- algoName, out);
+ OutputStream out = new FileOutputStream(outputFile);
+ cos = new CompressorStreamFactory().createCompressorOutputStream(algoName, out);
ios = new FileInputStream(inputFile);
IOUtils.copy(ios, cos);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/ambari/blob/bc7e0aa7/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
index 2ca9353..6321e17 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/DateUtil.java
@@ -20,12 +20,17 @@ package org.apache.ambari.logfeeder.util;
import java.text.SimpleDateFormat;
import java.util.Date;
+import java.util.TimeZone;
import org.apache.log4j.Logger;
public class DateUtil {
- private static final Logger logger = Logger.getLogger(DateUtil.class);
-
+ private static final Logger LOG = Logger.getLogger(DateUtil.class);
+
+ private DateUtil() {
+ throw new UnsupportedOperationException();
+ }
+
public static String dateToString(Date date, String dateFormat) {
if (date == null || dateFormat == null || dateFormat.isEmpty()) {
return "";
@@ -34,8 +39,36 @@ public class DateUtil {
SimpleDateFormat formatter = new SimpleDateFormat(dateFormat);
return formatter.format(date).toString();
} catch (Exception e) {
- logger.error("Error in coverting dateToString format :" + dateFormat, e);
+ LOG.error("Error in coverting dateToString format :" + dateFormat, e);
}
return "";
}
+
+ private final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
+ private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT);
+ sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
+ return sdf;
+ }
+ };
+
+ public static String getDate(String timeStampStr) {
+ try {
+ return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr)));
+ } catch (Exception ex) {
+ LOG.error(ex);
+ return null;
+ }
+ }
+
+ public static String getActualDateStr() {
+ try {
+ return dateFormatter.get().format(new Date());
+ } catch (Exception ex) {
+ LOG.error(ex);
+ return null;
+ }
+ }
}