You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ambari.apache.org by ol...@apache.org on 2016/04/15 21:17:46 UTC
[1/2] ambari git commit: AMBARI-15887. Added Logfeeder unit tests
(Miklos Gergely via oleewere)
Repository: ambari
Updated Branches:
refs/heads/trunk a55c055a8 -> e5c13a3ad
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
new file mode 100644
index 0000000..a7db3f8
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputKafkaTest.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Future;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.ambari.logfeeder.output.OutputKafka.KafkaCallBack;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class OutputKafkaTest {
+ private static final Logger LOG = Logger.getLogger(OutputKafkaTest.class);
+
+ private static final String TEST_TOPIC = "test topic";
+
+ private OutputKafka outputKafka;
+
+ @SuppressWarnings("unchecked")
+ private KafkaProducer<String, String> mockKafkaProducer = EasyMock.strictMock(KafkaProducer.class);
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void init() {
+ outputKafka = new OutputKafka() {
+ @Override
+ protected KafkaProducer<String, String> creteKafkaProducer(Properties props) {
+ return mockKafkaProducer;
+ }
+ };
+ }
+
+ @Test
+ public void testOutputKafka_uploadData() throws Exception {
+ LOG.info("testOutputKafka_uploadData()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("broker_list", "some broker list");
+ config.put("topic", TEST_TOPIC);
+
+ outputKafka.loadConfig(config);
+ outputKafka.init();
+
+ @SuppressWarnings("unchecked")
+ Future<RecordMetadata> mockFuture = EasyMock.mock(Future.class);
+
+ EasyMock.expect(mockKafkaProducer.send(new ProducerRecord<String, String>(TEST_TOPIC, "value0")))
+ .andReturn(mockFuture);
+ EasyMock.expect(mockFuture.get()).andReturn(null);
+
+ for (int i = 1; i < 10; i++)
+ EasyMock.expect(mockKafkaProducer.send(EasyMock.eq(new ProducerRecord<String, String>(TEST_TOPIC, "value" + i)),
+ EasyMock.anyObject(KafkaCallBack.class))).andReturn(null);
+
+ EasyMock.replay(mockKafkaProducer);
+
+ for (int i = 0; i < 10; i++) {
+ InputMarker inputMarker = new InputMarker();
+ inputMarker.input = EasyMock.mock(Input.class);
+ outputKafka.write("value" + i, inputMarker);
+ }
+
+ EasyMock.verify(mockKafkaProducer);
+ }
+
+ @Test
+ public void testOutputKafka_noBrokerList() throws Exception {
+ LOG.info("testOutputKafka_noBrokerList()");
+
+ expectedException.expect(Exception.class);
+ expectedException.expectMessage("For kafka output, bootstrap broker_list is needed");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("topic", TEST_TOPIC);
+
+ outputKafka.loadConfig(config);
+ outputKafka.init();
+ }
+
+ @Test
+ public void testOutputKafka_noTopic() throws Exception {
+ LOG.info("testOutputKafka_noBrokerList()");
+
+ expectedException.expect(Exception.class);
+ expectedException.expectMessage("For kafka output, topic is needed");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("broker_list", "some broker list");
+
+ outputKafka.loadConfig(config);
+ outputKafka.init();
+ }
+
+ @After
+ public void cleanUp() {
+ EasyMock.reset(mockKafkaProducer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
new file mode 100644
index 0000000..afbccca
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/OutputSolrTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.output;
+
+import java.net.MalformedURLException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.response.UpdateResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.util.NamedList;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class OutputSolrTest {
+ private static final Logger LOG = Logger.getLogger(OutputSolrTest.class);
+
+ private OutputSolr outputSolr;
+ private Map<Integer, SolrInputDocument> receivedDocs = new ConcurrentHashMap<>();
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void init() throws Exception {
+ outputSolr = new OutputSolr() {
+ @Override
+ SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
+ return new CloudSolrClient(null) {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public UpdateResponse add(Collection<SolrInputDocument> docs) {
+ for (SolrInputDocument doc : docs) {
+ receivedDocs.put((Integer) doc.getField("id").getValue(), doc);
+ }
+
+ UpdateResponse response = new UpdateResponse();
+ response.setResponse(new NamedList<Object>());
+ return response;
+ }
+ };
+ }
+ };
+ }
+
+ @Test
+ public void testOutputToSolr_uploadData() throws Exception {
+ LOG.info("testOutputToSolr_uploadData()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("url", "some url");
+ config.put("workers", "3");
+
+ outputSolr.loadConfig(config);
+ outputSolr.init();
+
+ Map<Integer, SolrInputDocument> expectedDocs = new HashMap<>();
+
+ int count = 0;
+ for (int i = 0; i < 10; i++) {
+ Map<String, Object> jsonObj = new HashMap<>();
+ for (int j = 0; j < 3; j++)
+ jsonObj.put("name" + ++count, "value" + ++count);
+ jsonObj.put("id", ++count);
+
+ InputMarker inputMarker = new InputMarker();
+ inputMarker.input = EasyMock.mock(Input.class);
+ outputSolr.write(jsonObj, inputMarker);
+
+ SolrInputDocument doc = new SolrInputDocument();
+ for (Map.Entry<String, Object> e : jsonObj.entrySet())
+ doc.addField(e.getKey(), e.getValue());
+
+ expectedDocs.put(count, doc);
+ }
+
+ Thread.sleep(100);
+ while (outputSolr.getPendingCount() > 0)
+ Thread.sleep(100);
+
+ int waitToFinish = 0;
+ if (receivedDocs.size() < 10 && waitToFinish < 10) {
+ Thread.sleep(100);
+ waitToFinish++;
+ }
+
+ Set<Integer> ids = new HashSet<>();
+ ids.addAll(receivedDocs.keySet());
+ ids.addAll(expectedDocs.keySet());
+ for (Integer id : ids) {
+ SolrInputDocument receivedDoc = receivedDocs.get(id);
+ SolrInputDocument expectedDoc = expectedDocs.get(id);
+
+ assertNotNull("No document received for id: " + id, receivedDoc);
+ assertNotNull("No document expected for id: " + id, expectedDoc);
+
+ Set<String> fieldNames = new HashSet<>();
+ fieldNames.addAll(receivedDoc.getFieldNames());
+ fieldNames.addAll(expectedDoc.getFieldNames());
+
+ for (String fieldName : fieldNames) {
+ Object receivedValue = receivedDoc.getFieldValue(fieldName);
+ Object expectedValue = expectedDoc.getFieldValue(fieldName);
+
+ assertNotNull("No received document field found for id: " + id + ", fieldName: " + fieldName, receivedValue);
+ assertNotNull("No expected document field found for id: " + id + ", fieldName: " + fieldName, expectedValue);
+
+ assertEquals("Field value not matching for id: " + id + ", fieldName: " + fieldName, receivedValue,
+ expectedValue);
+ }
+ }
+ }
+
+ @Test
+ public void testOutputToSolr_noUrlOrZKHost() throws Exception {
+ LOG.info("testOutputToSolr_noUrlOrZKHost()");
+
+ expectedException.expect(Exception.class);
+ expectedException.expectMessage("For solr output, either url or zk_hosts property need to be set");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("workers", "3");
+
+ outputSolr.loadConfig(config);
+ outputSolr.init();
+ }
+
+ @After
+ public void cleanUp() {
+ receivedDocs.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
new file mode 100644
index 0000000..e641018
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/resources/log4j.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+ <appender name="console" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d [%t] %-5p %C{6} (%F:%L) - %m%n" />
+ <!-- <param name="ConversionPattern" value="%d [%t] %-5p %c %x - %m%n"/> -->
+ </layout>
+ </appender>
+
+ <!-- Logs to suppress BEGIN -->
+ <category name="org.apache.solr.common.cloud.ZkStateReader" additivity="false">
+ <priority value="error" />
+ <appender-ref ref="console" />
+ </category>
+
+ <category name="apache.solr.client.solrj.impl.CloudSolrClient" additivity="false">
+ <priority value="fatal" />
+ <appender-ref ref="console" />
+ </category>
+ <!-- Logs to suppress END -->
+
+ <category name="org.apache.ambari.logfeeder" additivity="false">
+ <priority value="info" />
+ <appender-ref ref="console" />
+ <!-- <appender-ref ref="daily_rolling_file" /> -->
+ </category>
+
+ <root>
+ <priority value="warn" />
+ <!-- <appender-ref ref="console" /> -->
+ <!-- <appender-ref ref="daily_rolling_file" /> -->
+ </root>
+
+</log4j:configuration>
[2/2] ambari git commit: AMBARI-15887. Added Logfeeder unit tests
(Miklos Gergely via oleewere)
Posted by ol...@apache.org.
AMBARI-15887. Added Logfeeder unit tests (Miklos Gergely via oleewere)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/e5c13a3a
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/e5c13a3a
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/e5c13a3a
Branch: refs/heads/trunk
Commit: e5c13a3ada85ffd0ffc8d05171c934eabe6243d8
Parents: a55c055
Author: miki.gergely <mg...@hortonworks.com>
Authored: Fri Apr 15 18:14:53 2016 +0200
Committer: oleewere <ol...@gmail.com>
Committed: Fri Apr 15 21:12:13 2016 +0200
----------------------------------------------------------------------
.../ambari-logsearch-appender/pom.xml | 3 +-
.../ambari-logsearch-logfeeder/pom.xml | 10 +-
.../ambari/logfeeder/output/OutputKafka.java | 283 +++++-----
.../ambari/logfeeder/output/OutputSolr.java | 566 +++++++++----------
.../ambari/logfeeder/filter/FilterGrokTest.java | 157 +++++
.../logfeeder/filter/FilterKeyValueTest.java | 125 ++++
.../logfeeder/filter/JSONFilterCodeTest.java | 124 ++++
.../ambari/logfeeder/input/InputFileTest.java | 195 +++++++
.../ambari/logfeeder/mapper/MapperDateTest.java | 145 +++++
.../logfeeder/mapper/MapperFieldNameTest.java | 71 +++
.../logfeeder/mapper/MapperFieldValueTest.java | 90 +++
.../logfeeder/output/OutputKafkaTest.java | 128 +++++
.../ambari/logfeeder/output/OutputSolrTest.java | 165 ++++++
.../src/test/resources/log4j.xml | 53 ++
14 files changed, 1657 insertions(+), 458 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-appender/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-appender/pom.xml b/ambari-logsearch/ambari-logsearch-appender/pom.xml
index 85852f6..39f250a 100644
--- a/ambari-logsearch/ambari-logsearch-appender/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-appender/pom.xml
@@ -77,8 +77,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
-</project>
\ No newline at end of file
+</project>
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
index a1443c9..0888010 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/pom.xml
@@ -41,7 +41,12 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
<scope>test</scope>
</dependency>
<dependency>
@@ -74,7 +79,6 @@
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
-
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -105,8 +109,8 @@
<artifactId>jackson-xc</artifactId>
<version>1.9.13</version>
</dependency>
-
</dependencies>
+
<build>
<finalName>LogFeeder</finalName>
<pluginManagement>
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
index c594dd4..cd4f951 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputKafka.java
@@ -19,15 +19,12 @@
package org.apache.ambari.logfeeder.output;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
import org.apache.ambari.logfeeder.LogFeederUtil;
-import org.apache.ambari.logfeeder.input.Input;
import org.apache.ambari.logfeeder.input.InputMarker;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Callback;
@@ -39,58 +36,51 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
public class OutputKafka extends Output {
- static private Logger logger = Logger.getLogger(OutputKafka.class);
+ private static final Logger LOG = Logger.getLogger(OutputKafka.class);
- String brokerList = null;
- String topic = null;
- boolean isAsync = true;
- long messageCount = 0;
- int batchSize = 5000;
- int lingerMS = 1000;
+ private static final int FAILED_RETRY_INTERVAL = 30;
+ private static final int CATCHUP_RETRY_INTERVAL = 5;
+
+ private static final int DEFAULT_BATCH_SIZE = 5000;
+ private static final int DEFAULT_LINGER_MS = 1000;
+
+ private String topic = null;
+ private boolean isAsync = true;
+ private long messageCount = 0;
private KafkaProducer<String, String> producer = null;
- BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
+ private BlockingQueue<KafkaCallBack> failedMessages = new LinkedTransferQueue<KafkaCallBack>();
// Let's start with the assumption Kafka is down
- boolean isKafkaBrokerUp = false;
-
- static final int FAILED_RETRY_INTERVAL = 30;
- static final int CATCHUP_RETRY_INTERVAL = 5;
+ private boolean isKafkaBrokerUp = false;
@Override
public void init() throws Exception {
super.init();
- statMetric.metricsName = "output.kafka.write_logs";
- writeBytesMetric.metricsName = "output.kafka.write_bytes";
+ Properties props = initProperties();
- brokerList = getStringValue("broker_list");
- topic = getStringValue("topic");
- isAsync = getBooleanValue("is_async", true);
- batchSize = getIntValue("batch_size", batchSize);
- lingerMS = getIntValue("linger_ms", lingerMS);
+ producer = creteKafkaProducer(props);
+ createKafkaRetryThread();
+ }
- Map<String, Object> kafkaCustomProperties = new HashMap<String, Object>();
- // Get all kafka custom properties
- for (String key : configs.keySet()) {
- if (key.startsWith("kafka.")) {
- Object value = configs.get(key);
- if (value == null || value.toString().length() == 0) {
- continue;
- }
- String kafkaKey = key.substring("kafka.".length());
- kafkaCustomProperties.put(kafkaKey, value);
- }
- }
+ private Properties initProperties() throws Exception {
+ statMetric.metricsName = "output.kafka.write_logs";
+ writeBytesMetric.metricsName = "output.kafka.write_bytes";
+ String brokerList = getStringValue("broker_list");
if (StringUtils.isEmpty(brokerList)) {
- throw new Exception(
- "For kafka output, bootstrap broker_list is needed");
+ throw new Exception("For kafka output, bootstrap broker_list is needed");
}
+ topic = getStringValue("topic");
if (StringUtils.isEmpty(topic)) {
throw new Exception("For kafka output, topic is needed");
}
+ isAsync = getBooleanValue("is_async", true);
+ int batchSize = getIntValue("batch_size", DEFAULT_BATCH_SIZE);
+ int lingerMS = getIntValue("linger_ms", DEFAULT_LINGER_MS);
+
Properties props = new Properties();
// 0.9.0
props.put("bootstrap.servers", brokerList);
@@ -101,50 +91,56 @@ public class OutputKafka extends Output {
// props.put("retries", "3");
props.put("batch.size", batchSize);
props.put("linger.ms", lingerMS);
+ // props.put("metadata.broker.list", brokerList);
- for (String kafkaKey : kafkaCustomProperties.keySet()) {
- logger.info("Adding custom Kafka property. " + kafkaKey + "="
- + kafkaCustomProperties.get(kafkaKey));
- props.put(kafkaKey, kafkaCustomProperties.get(kafkaKey));
+ // Get all kafka custom properties
+ for (String key : configs.keySet()) {
+ if (key.startsWith("kafka.")) {
+ Object value = configs.get(key);
+ if (value == null || value.toString().length() == 0) {
+ continue;
+ }
+ String kafkaKey = key.substring("kafka.".length());
+ LOG.info("Adding custom Kafka property. " + kafkaKey + "=" + value);
+ props.put(kafkaKey, value);
+ }
}
- // props.put("metadata.broker.list", brokerList);
+ return props;
+ }
+
+ protected KafkaProducer<String, String> creteKafkaProducer(Properties props) {
+ return new KafkaProducer<String, String>(props);
+ }
- producer = new KafkaProducer<String, String>(props);
+ private void createKafkaRetryThread() {
Thread retryThread = new Thread("kafka-writer-retry,topic=" + topic) {
@Override
public void run() {
KafkaCallBack kafkaCallBack = null;
- logger.info("Started thread to monitor failed messsages. "
- + getShortDescription());
+ LOG.info("Started thread to monitor failed messsages. " + getShortDescription());
while (true) {
try {
if (kafkaCallBack == null) {
kafkaCallBack = failedMessages.take();
}
- if (publishMessage(kafkaCallBack.message,
- kafkaCallBack.inputMarker)) {
- // logger.info("Sent message. count="
- // + kafkaCallBack.thisMessageNumber);
+ if (publishMessage(kafkaCallBack.message, kafkaCallBack.inputMarker)) {
+ // logger.info("Sent message. count=" +
+ // kafkaCallBack.thisMessageNumber);
kafkaCallBack = null;
} else {
// Should wait for sometime
- logger.error("Kafka is down. messageNumber="
- + kafkaCallBack.thisMessageNumber
- + ". Going to sleep for "
- + FAILED_RETRY_INTERVAL + " seconds");
+ LOG.error("Kafka is down. messageNumber=" + kafkaCallBack.thisMessageNumber + ". Going to sleep for "
+ + FAILED_RETRY_INTERVAL + " seconds");
Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
}
} catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error sending message to Kafka during retry. message="
- + (kafkaCallBack == null ? null
- : kafkaCallBack.message), t,
- logger, Level.ERROR);
+ String logMessageKey = this.getClass().getSimpleName() + "_KAFKA_RETRY_WRITE_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+ "Error sending message to Kafka during retry. message="
+ + (kafkaCallBack == null ? null : kafkaCallBack.message),
+ t, LOG, Level.ERROR);
}
}
@@ -155,6 +151,33 @@ public class OutputKafka extends Output {
}
@Override
+ public synchronized void write(String block, InputMarker inputMarker) throws Exception {
+ while (!isDrain() && !inputMarker.input.isDrain()) {
+ try {
+ if (failedMessages.size() == 0) {
+ if (publishMessage(block, inputMarker)) {
+ break;
+ }
+ }
+ if (isDrain() || inputMarker.input.isDrain()) {
+ break;
+ }
+ if (!isKafkaBrokerUp) {
+ LOG.error("Kafka is down. Going to sleep for " + FAILED_RETRY_INTERVAL + " seconds");
+ Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
+ } else {
+ LOG.warn("Kafka is still catching up from previous failed messages. outstanding messages="
+ + failedMessages.size() + " Going to sleep for " + CATCHUP_RETRY_INTERVAL + " seconds");
+ Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
+ }
+ } catch (Throwable t) {
+ // ignore
+ break;
+ }
+ }
+ }
+
+ @Override
public void setDrain(boolean drain) {
super.setDrain(drain);
}
@@ -163,151 +186,101 @@ public class OutputKafka extends Output {
* Flush document buffer
*/
public void flush() {
- logger.info("Flush called...");
+ LOG.info("Flush called...");
setDrain(true);
}
@Override
public void close() {
- logger.info("Closing Kafka client...");
+ LOG.info("Closing Kafka client...");
flush();
if (producer != null) {
try {
producer.close();
} catch (Throwable t) {
- logger.error("Error closing Kafka topic. topic=" + topic);
+ LOG.error("Error closing Kafka topic. topic=" + topic);
}
}
- logger.info("Closed Kafka client");
+ LOG.info("Closed Kafka client");
super.close();
}
- @Override
- synchronized public void write(String block, InputMarker inputMarker) throws Exception {
- while (!isDrain() && !inputMarker.input.isDrain()) {
- try {
- if (failedMessages.size() == 0) {
- if (publishMessage(block, inputMarker)) {
- break;
- }
- }
- if (isDrain() || inputMarker.input.isDrain()) {
- break;
- }
- if (!isKafkaBrokerUp) {
- logger.error("Kafka is down. Going to sleep for "
- + FAILED_RETRY_INTERVAL + " seconds");
- Thread.sleep(FAILED_RETRY_INTERVAL * 1000);
-
- } else {
- logger.warn("Kafka is still catching up from previous failed messages. outstanding messages="
- + failedMessages.size()
- + " Going to sleep for "
- + CATCHUP_RETRY_INTERVAL + " seconds");
- Thread.sleep(CATCHUP_RETRY_INTERVAL * 1000);
- }
- } catch (Throwable t) {
- // ignore
- break;
- }
- }
- }
-
private boolean publishMessage(String block, InputMarker inputMarker) {
if (isAsync && isKafkaBrokerUp) { // Send asynchronously
producer.send(new ProducerRecord<String, String>(topic, block),
- new KafkaCallBack(this, block, inputMarker, ++messageCount));
+ new KafkaCallBack(this, block, inputMarker, ++messageCount));
return true;
} else { // Send synchronously
try {
// Not using key. Let it round robin
- RecordMetadata metadata = producer.send(
- new ProducerRecord<String, String>(topic, block)).get();
+ RecordMetadata metadata = producer.send(new ProducerRecord<String, String>(topic, block)).get();
if (metadata != null) {
statMetric.count++;
writeBytesMetric.count += block.length();
}
if (!isKafkaBrokerUp) {
- logger.info("Started writing to kafka. "
- + getShortDescription());
+ LOG.info("Started writing to kafka. " + getShortDescription());
isKafkaBrokerUp = true;
}
return true;
} catch (InterruptedException e) {
isKafkaBrokerUp = false;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_KAFKA_INTERRUPT";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "InterruptedException-Error sending message to Kafka",
- e, logger, Level.ERROR);
+ String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_INTERRUPT";
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "InterruptedException-Error sending message to Kafka", e,
+ LOG, Level.ERROR);
} catch (ExecutionException e) {
isKafkaBrokerUp = false;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_KAFKA_EXECUTION";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "ExecutionException-Error sending message to Kafka", e,
- logger, Level.ERROR);
+ String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_EXECUTION";
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "ExecutionException-Error sending message to Kafka", e,
+ LOG, Level.ERROR);
} catch (Throwable t) {
isKafkaBrokerUp = false;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_KAFKA_WRITE_ERROR";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "GenericException-Error sending message to Kafka", t,
- logger, Level.ERROR);
+ String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_WRITE_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "GenericException-Error sending message to Kafka", t,
+ LOG, Level.ERROR);
}
}
return false;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
- */
@Override
public String getShortDescription() {
return "output:destination=kafka,topic=" + topic;
}
-}
-
-class KafkaCallBack implements Callback {
- static private Logger logger = Logger.getLogger(KafkaCallBack.class);
+ class KafkaCallBack implements Callback {
- long thisMessageNumber;
- OutputKafka output = null;
- String message;
- InputMarker inputMarker;
+ private long thisMessageNumber;
+ private OutputKafka output = null;
+ private String message;
+ private InputMarker inputMarker;
- public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker,
- long messageCount) {
- this.thisMessageNumber = messageCount;
- this.output = output;
- this.inputMarker = inputMarker;
- this.message = message;
- }
+ public KafkaCallBack(OutputKafka output, String message, InputMarker inputMarker, long messageCount) {
+ this.thisMessageNumber = messageCount;
+ this.output = output;
+ this.inputMarker = inputMarker;
+ this.message = message;
+ }
- public void onCompletion(RecordMetadata metadata, Exception exception) {
- if (metadata != null) {
- if (!output.isKafkaBrokerUp) {
- logger.info("Started writing to kafka. "
- + output.getShortDescription());
- output.isKafkaBrokerUp = true;
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (metadata != null) {
+ if (!output.isKafkaBrokerUp) {
+ LOG.info("Started writing to kafka. " + output.getShortDescription());
+ output.isKafkaBrokerUp = true;
+ }
+ output.incrementStat(1);
+ output.writeBytesMetric.count += message.length();
+
+ // metadata.partition();
+ // metadata.offset();
+ } else {
+ output.isKafkaBrokerUp = false;
+ String logKeyMessage = this.getClass().getSimpleName() + "_KAFKA_ASYNC_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logKeyMessage, "Error sending message to Kafka. Async Callback",
+ exception, LOG, Level.ERROR);
+
+ output.failedMessages.add(this);
}
- output.incrementStat(1);
- output.writeBytesMetric.count += message.length();
-
- // metadata.partition();
- // metadata.offset();
- } else {
- output.isKafkaBrokerUp = false;
- final String LOG_MESSAGE_KEY = this.getClass().getSimpleName()
- + "_KAFKA_ASYNC_ERROR";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Error sending message to Kafka. Async Callback",
- exception, logger, Level.ERROR);
-
- output.failedMessages.add(this);
}
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
index 7cd911d..215f691 100644
--- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/output/OutputSolr.java
@@ -20,6 +20,7 @@
package org.apache.ambari.logfeeder.output;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
@@ -37,6 +38,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.impl.LBHttpSolrClient;
@@ -45,154 +47,171 @@ import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.common.SolrInputDocument;
public class OutputSolr extends Output {
- static private Logger logger = Logger.getLogger(OutputSolr.class);
-
- private static final String ROUTER_FIELD = "_router_field_";
-
- String solrUrl = null;
- String zkHosts = null;
- String collection = null;
- String splitMode = "none";
- int splitInterval = 0;
- int numberOfShards = 1;
- boolean isComputeCurrentCollection = false;
-
- int maxBufferSize = 5000;
- int maxIntervalMS = 3000;
- int workers = 1;
-
- BlockingQueue<OutputData> outgoingBuffer = null;
- List<SolrWorkerThread> writerThreadList = new ArrayList<SolrWorkerThread>();
- private static final int RETRY_INTERVAL = 30;
-
- int lastSlotByMin = -1;
+ private static final Logger LOG = Logger.getLogger(OutputSolr.class);
+
+ private static final int DEFAULT_MAX_BUFFER_SIZE = 5000;
+ private static final int DEFAULT_MAX_INTERVAL_MS = 3000;
+ private static final int DEFAULT_NUMBER_OF_SHARDS = 1;
+ private static final int DEFAULT_SPLIT_INTERVAL = 30;
+ private static final int DEFAULT_NUMBER_OF_WORKERS = 1;
+
+ private String collection;
+ private String splitMode;
+ private int splitInterval;
+ private int numberOfShards;
+ private int maxIntervalMS;
+ private int workers;
+ private int maxBufferSize;
+ private boolean isComputeCurrentCollection = false;
+ private int lastSlotByMin = -1;
+
+ private BlockingQueue<OutputData> outgoingBuffer = null;
+ private List<SolrWorkerThread> workerThreadList = new ArrayList<>();
@Override
public void init() throws Exception {
super.init();
+ initParams();
+ createOutgoingBuffer();
+ createSolrWorkers();
+ }
+
+ private void initParams() {
statMetric.metricsName = "output.solr.write_logs";
writeBytesMetric.metricsName = "output.solr.write_bytes";
- solrUrl = getStringValue("url");
- zkHosts = getStringValue("zk_hosts");
- splitMode = getStringValue("splits_interval_mins", splitMode);
+ splitMode = getStringValue("splits_interval_mins", "none");
if (!splitMode.equalsIgnoreCase("none")) {
- splitInterval = getIntValue("split_interval_mins", 30);
+ splitInterval = getIntValue("split_interval_mins", DEFAULT_SPLIT_INTERVAL);
}
- numberOfShards = getIntValue("number_of_shards", numberOfShards);
+ numberOfShards = getIntValue("number_of_shards", DEFAULT_NUMBER_OF_SHARDS);
- maxBufferSize = getIntValue("flush_size", maxBufferSize);
+ maxIntervalMS = getIntValue("idle_flush_time_ms", DEFAULT_MAX_INTERVAL_MS);
+ workers = getIntValue("workers", DEFAULT_NUMBER_OF_WORKERS);
+
+ maxBufferSize = getIntValue("flush_size", DEFAULT_MAX_BUFFER_SIZE);
if (maxBufferSize < 1) {
- logger.warn("maxBufferSize is less than 1. Making it 1");
+ LOG.warn("maxBufferSize is less than 1. Making it 1");
+ maxBufferSize = 1;
}
- maxIntervalMS = getIntValue("idle_flush_time_ms", maxIntervalMS);
- workers = getIntValue("workers", workers);
-
- logger.info("Config: Number of workers=" + workers + ", splitMode="
- + splitMode + ", splitInterval=" + splitInterval
- + ", numberOfShards=" + numberOfShards + ". "
- + getShortDescription());
- if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) {
- throw new Exception(
- "For solr output, either url or zk_hosts property need to be set");
- }
+ LOG.info(String.format("Config: Number of workers=%d, splitMode=%s, splitInterval=%d, " + "numberOfShards=%d. "
+ + getShortDescription(), workers, splitMode, splitInterval, numberOfShards));
+ }
+ private void createOutgoingBuffer() {
int bufferSize = maxBufferSize * (workers + 3);
- logger.info("Creating blocking queue with bufferSize=" + bufferSize);
- // outgoingBuffer = new ArrayBlockingQueue<OutputData>(bufferSize);
+ LOG.info("Creating blocking queue with bufferSize=" + bufferSize);
outgoingBuffer = new LinkedBlockingQueue<OutputData>(bufferSize);
+ }
+
+ private void createSolrWorkers() throws Exception, MalformedURLException {
+ String solrUrl = getStringValue("url");
+ String zkHosts = getStringValue("zk_hosts");
+ if (StringUtils.isEmpty(solrUrl) && StringUtils.isEmpty(zkHosts)) {
+ throw new Exception("For solr output, either url or zk_hosts property need to be set");
+ }
for (int count = 0; count < workers; count++) {
- SolrClient solrClient = null;
- CloudSolrClient solrClouldClient = null;
- if (zkHosts != null) {
- logger.info("Using zookeepr. zkHosts=" + zkHosts);
- collection = getStringValue("collection");
- if (StringUtils.isEmpty(collection)) {
- throw new Exception(
- "For solr cloud property collection is mandatory");
- }
- logger.info("Using collection=" + collection);
- solrClouldClient = new CloudSolrClient(zkHosts);
- solrClouldClient.setDefaultCollection(collection);
- solrClient = solrClouldClient;
- if (splitMode.equalsIgnoreCase("none")) {
- isComputeCurrentCollection = false;
- } else {
- isComputeCurrentCollection = true;
- }
- } else {
- String[] solrUrls = StringUtils.split(solrUrl, ",");
- if (solrUrls.length == 1) {
- logger.info("Using SolrURL=" + solrUrl);
- solrClient = new HttpSolrClient(solrUrl);
- } else {
- logger.info("Using load balance solr client. solrUrls="
- + solrUrl);
- logger.info("Initial URL for LB solr=" + solrUrls[0]);
- @SuppressWarnings("resource")
- LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(
- solrUrls[0]);
- for (int i = 1; i < solrUrls.length; i++) {
- logger.info("Adding URL for LB solr=" + solrUrls[i]);
- lbSolrClient.addSolrServer(solrUrls[i]);
- }
- solrClient = lbSolrClient;
- }
- }
- try {
- logger.info("Pinging Solr server. zkHosts=" + zkHosts
- + ", urls=" + solrUrl);
- SolrPingResponse response = solrClient.ping();
- if (response.getStatus() == 0) {
- logger.info("Ping to Solr server is successful for writer="
- + count);
- } else {
- logger.warn("Ping to Solr server failed. It would check again. writer="
- + count
- + ", solrUrl="
- + solrUrl
- + ", zkHosts="
- + zkHosts
- + ", collection="
- + collection
- + ", response=" + response);
- }
- } catch (Throwable t) {
- logger.warn(
- "Ping to Solr server failed. It would check again. writer="
- + count + ", solrUrl=" + solrUrl + ", zkHosts="
- + zkHosts + ", collection=" + collection, t);
+ SolrClient solrClient = getSolrClient(solrUrl, zkHosts, count);
+ createSolrWorkerThread(count, solrClient);
+ }
+ }
+
+ SolrClient getSolrClient(String solrUrl, String zkHosts, int count) throws Exception, MalformedURLException {
+ SolrClient solrClient = null;
+
+ if (zkHosts != null) {
+ solrClient = createCloudSolrClient(zkHosts);
+ } else {
+ solrClient = createHttpSolarClient(solrUrl);
+ }
+
+ pingSolr(solrUrl, zkHosts, count, solrClient);
+
+ return solrClient;
+ }
+
+ private SolrClient createCloudSolrClient(String zkHosts) throws Exception {
+ LOG.info("Using zookeepr. zkHosts=" + zkHosts);
+ collection = getStringValue("collection");
+ if (StringUtils.isEmpty(collection)) {
+ throw new Exception("For solr cloud property collection is mandatory");
+ }
+ LOG.info("Using collection=" + collection);
+
+ CloudSolrClient solrClient = new CloudSolrClient(zkHosts);
+ solrClient.setDefaultCollection(collection);
+ isComputeCurrentCollection = !splitMode.equalsIgnoreCase("none");
+ return solrClient;
+ }
+
+ private SolrClient createHttpSolarClient(String solrUrl) throws MalformedURLException {
+ String[] solrUrls = StringUtils.split(solrUrl, ",");
+ if (solrUrls.length == 1) {
+ LOG.info("Using SolrURL=" + solrUrl);
+ return new HttpSolrClient(solrUrl);
+ } else {
+ LOG.info("Using load balance solr client. solrUrls=" + solrUrl);
+ LOG.info("Initial URL for LB solr=" + solrUrls[0]);
+ LBHttpSolrClient lbSolrClient = new LBHttpSolrClient(solrUrls[0]);
+ for (int i = 1; i < solrUrls.length; i++) {
+ LOG.info("Adding URL for LB solr=" + solrUrls[i]);
+ lbSolrClient.addSolrServer(solrUrls[i]);
}
+ return lbSolrClient;
+ }
+ }
- // Let's start the thread
- SolrWorkerThread solrWriterThread = new SolrWorkerThread(solrClient);
- solrWriterThread.setName(getNameForThread() + "," + collection
- + ",writer=" + count);
- solrWriterThread.setDaemon(true);
- solrWriterThread.start();
- writerThreadList.add(solrWriterThread);
+ private void pingSolr(String solrUrl, String zkHosts, int count, SolrClient solrClient) {
+ try {
+ LOG.info("Pinging Solr server. zkHosts=" + zkHosts + ", urls=" + solrUrl);
+ SolrPingResponse response = solrClient.ping();
+ if (response.getStatus() == 0) {
+ LOG.info("Ping to Solr server is successful for worker=" + count);
+ } else {
+ LOG.warn(
+ String.format(
+ "Ping to Solr server failed. It would check again. worker=%d, "
+ + "solrUrl=%s, zkHosts=%s, collection=%s, response=%s",
+ count, solrUrl, zkHosts, collection, response));
+ }
+ } catch (Throwable t) {
+ LOG.warn(String.format(
+ "Ping to Solr server failed. It would check again. worker=%d, " + "solrUrl=%s, zkHosts=%s, collection=%s",
+ count, solrUrl, zkHosts, collection), t);
}
}
+ private void createSolrWorkerThread(int count, SolrClient solrClient) {
+ SolrWorkerThread solrWorkerThread = new SolrWorkerThread(solrClient);
+ solrWorkerThread.setName(getNameForThread() + "," + collection + ",worker=" + count);
+ solrWorkerThread.setDaemon(true);
+ solrWorkerThread.start();
+ workerThreadList.add(solrWorkerThread);
+ }
+
@Override
- public void setDrain(boolean drain) {
- super.setDrain(drain);
+ public void write(Map<String, Object> jsonObj, InputMarker inputMarker) throws Exception {
+ try {
+ outgoingBuffer.put(new OutputData(jsonObj, inputMarker));
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
/**
* Flush document buffer
*/
public void flush() {
- logger.info("Flush called...");
+ LOG.info("Flush called...");
setDrain(true);
int wrapUpTimeSecs = 30;
// Give wrapUpTimeSecs seconds to wrap up
boolean isPending = false;
for (int i = 0; i < wrapUpTimeSecs; i++) {
- for (SolrWorkerThread solrWorkerThread : writerThreadList) {
+ for (SolrWorkerThread solrWorkerThread : workerThreadList) {
if (solrWorkerThread.isDone()) {
try {
solrWorkerThread.interrupt();
@@ -205,8 +224,7 @@ public class OutputSolr extends Output {
}
if (isPending) {
try {
- logger.info("Will give " + (wrapUpTimeSecs - i)
- + " seconds to wrap up");
+ LOG.info("Will give " + (wrapUpTimeSecs - i) + " seconds to wrap up");
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
@@ -217,238 +235,194 @@ public class OutputSolr extends Output {
}
@Override
+ public void setDrain(boolean drain) {
+ super.setDrain(drain);
+ }
+
+ @Override
public long getPendingCount() {
- long totalCount = 0;
- for (SolrWorkerThread solrWorkerThread : writerThreadList) {
- totalCount += solrWorkerThread.localBuffer.size();
+ long pendingCount = 0;
+ for (SolrWorkerThread solrWorkerThread : workerThreadList) {
+ pendingCount += solrWorkerThread.localBuffer.size();
}
- return totalCount;
+ return pendingCount;
}
@Override
public void close() {
- logger.info("Closing Solr client...");
+ LOG.info("Closing Solr client...");
flush();
- logger.info("Closed Solr client");
+ LOG.info("Closed Solr client");
super.close();
}
@Override
- public void write(Map<String, Object> jsonObj, InputMarker inputMarker)
- throws Exception {
- try {
- outgoingBuffer.put(new OutputData(jsonObj, inputMarker));
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- /*
- * (non-Javadoc)
- *
- * @see org.apache.ambari.logfeeder.ConfigBlock#getShortDescription()
- */
- @Override
public String getShortDescription() {
return "output:destination=solr,collection=" + collection;
}
class SolrWorkerThread extends Thread {
- /**
- *
- */
- SolrClient solrClient = null;
- Collection<SolrInputDocument> localBuffer = new ArrayList<SolrInputDocument>();
- long localBufferBytesSize = 0;
- Map<String, InputMarker> latestInputMarkerList = new HashMap<String, InputMarker>();
+ private static final String ROUTER_FIELD = "_router_field_";
+ private static final int RETRY_INTERVAL = 30;
+
+ private final SolrClient solrClient;
+ private final Collection<SolrInputDocument> localBuffer = new ArrayList<>();
+ private final Map<String, InputMarker> latestInputMarkerList = new HashMap<>();
+
+ private long localBufferBytesSize = 0;
- /**
- *
- */
public SolrWorkerThread(SolrClient solrClient) {
this.solrClient = solrClient;
}
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Runnable#run()
- */
@Override
public void run() {
- logger.info("SolrWriter thread started");
+ LOG.info("SolrWorker thread started");
long lastDispatchTime = System.currentTimeMillis();
- //long totalWaitTimeMS = 0;
while (true) {
long currTimeMS = System.currentTimeMillis();
OutputData outputData = null;
try {
- long nextDispatchDuration = maxIntervalMS
- - (currTimeMS - lastDispatchTime);
- outputData = outgoingBuffer.poll();
- if (outputData == null && !isDrain()
- && nextDispatchDuration > 0) {
- outputData = outgoingBuffer.poll(nextDispatchDuration,
- TimeUnit.MILLISECONDS);
-// long diffTimeMS = System.currentTimeMillis()
-// - currTimeMS;
- // logger.info("Waited for " + diffTimeMS +
- // " ms, planned for "
- // + nextDispatchDuration + " ms, localBuffer.size="
- // + localBuffer.size() + ", timedOut="
- // + (outputData == null ? "true" : "false"));
- }
+ long nextDispatchDuration = maxIntervalMS - (currTimeMS - lastDispatchTime);
+ outputData = getOutputData(nextDispatchDuration);
- if (isDrain() && outputData == null
- && outgoingBuffer.size() == 0) {
- break;
- }
if (outputData != null) {
- if (outputData.jsonObj.get("id") == null) {
- outputData.jsonObj.put("id", UUID.randomUUID()
- .toString());
- }
- SolrInputDocument document = new SolrInputDocument();
- for (String name : outputData.jsonObj.keySet()) {
- Object obj = outputData.jsonObj.get(name);
- document.addField(name, obj);
- try {
- localBufferBytesSize += obj.toString().length();
- } catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_BYTE_COUNT_ERROR";
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error calculating byte size. object="
- + obj, t, logger, Level.ERROR);
-
- }
+ createSolrDocument(outputData);
+ } else {
+ if (isDrain() && outgoingBuffer.size() == 0) {
+ break;
}
- latestInputMarkerList.put(
- outputData.inputMarker.base64FileKey,
- outputData.inputMarker);
- localBuffer.add(document);
}
- if (localBuffer.size() > 0
- && ((outputData == null && isDrain()) || (nextDispatchDuration <= 0 || localBuffer
- .size() >= maxBufferSize))) {
+ if (localBuffer.size() > 0 && ((outputData == null && isDrain())
+ || (nextDispatchDuration <= 0 || localBuffer.size() >= maxBufferSize))) {
try {
if (isComputeCurrentCollection) {
// Compute the current router value
-
- int weekDay = Calendar.getInstance().get(
- Calendar.DAY_OF_WEEK);
- int currHour = Calendar.getInstance().get(
- Calendar.HOUR_OF_DAY);
- int currMin = Calendar.getInstance().get(
- Calendar.MINUTE);
-
- int minOfWeek = (weekDay - 1) * 24 * 60
- + currHour * 60 + currMin;
- int slotByMin = minOfWeek / splitInterval
- % numberOfShards;
-
- String shard = "shard" + slotByMin;
-
- if (lastSlotByMin != slotByMin) {
- logger.info("Switching to shard " + shard
- + ", output="
- + getShortDescription());
- lastSlotByMin = slotByMin;
- }
-
- for (SolrInputDocument solrInputDocument : localBuffer) {
- solrInputDocument.addField(ROUTER_FIELD,
- shard);
- }
+ addRouterField();
}
-// long beginTime = System.currentTimeMillis();
- UpdateResponse response = solrClient
- .add(localBuffer);
-// long endTime = System.currentTimeMillis();
-// logger.info("Adding to Solr. Document count="
-// + localBuffer.size() + ". Took "
-// + (endTime - beginTime) + " ms");
-
- if (response.getStatus() != 0) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_SOLR_UPDATE_ERROR";
- LogFeederUtil
- .logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error writing to Solr. response="
- + response.toString()
- + ", log="
- + (outputData == null ? null
- : outputData
- .toString()),
- null, logger, Level.ERROR);
- }
- statMetric.count += localBuffer.size();
- writeBytesMetric.count += localBufferBytesSize;
- for (InputMarker inputMarker : latestInputMarkerList
- .values()) {
- inputMarker.input.checkIn(inputMarker);
- }
+ addToSolr(outputData);
resetLocalBuffer();
lastDispatchTime = System.currentTimeMillis();
} catch (IOException ioException) {
// Transient error, lets block till it is available
- while (!isDrain()) {
- try {
- logger.warn("Solr is down. Going to sleep for "
- + RETRY_INTERVAL
- + " seconds. output="
- + getShortDescription());
- Thread.sleep(RETRY_INTERVAL * 1000);
- } catch (Throwable t) {
- // ignore
- break;
- }
- if (isDrain()) {
- break;
- }
- try {
- SolrPingResponse pingResponse = solrClient
- .ping();
- if (pingResponse.getStatus() == 0) {
- logger.info("Solr seems to be up now. Resuming... output="
- + getShortDescription());
- break;
- }
- } catch (Throwable t) {
- // Ignore
- }
- }
+ waitForSolr();
} catch (Throwable serverException) {
// Clear the buffer
resetLocalBuffer();
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(
- LOG_MESSAGE_KEY,
- "Error sending log message to server. "
- + (outputData == null ? null
- : outputData.toString()),
- serverException, logger, Level.ERROR);
+ String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+ "Error sending log message to server. " + outputData, serverException, LOG, Level.ERROR);
}
}
} catch (InterruptedException e) {
// Handle thread exiting
} catch (Throwable t) {
- final String LOG_MESSAGE_KEY = this.getClass()
- .getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
- LogFeederUtil.logErrorMessageByInterval(LOG_MESSAGE_KEY,
- "Caught exception in main loop. " + outputData, t,
- logger, Level.ERROR);
+ String logMessageKey = this.getClass().getSimpleName() + "_SOLR_MAINLOOP_EXCEPTION";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Caught exception in main loop. " + outputData, t, LOG,
+ Level.ERROR);
+ }
+ }
+
+ closeSolrClient();
+
+ resetLocalBuffer();
+ LOG.info("Exiting Solr worker thread. output=" + getShortDescription());
+ }
+
+ private OutputData getOutputData(long nextDispatchDuration) throws InterruptedException {
+ OutputData outputData = outgoingBuffer.poll();
+ if (outputData == null && !isDrain() && nextDispatchDuration > 0) {
+ outputData = outgoingBuffer.poll(nextDispatchDuration, TimeUnit.MILLISECONDS);
+ }
+ if (outputData != null && outputData.jsonObj.get("id") == null) {
+ outputData.jsonObj.put("id", UUID.randomUUID().toString());
+ }
+ return outputData;
+ }
+
+ private void createSolrDocument(OutputData outputData) {
+ SolrInputDocument document = new SolrInputDocument();
+ for (String name : outputData.jsonObj.keySet()) {
+ Object obj = outputData.jsonObj.get(name);
+ document.addField(name, obj);
+ try {
+ localBufferBytesSize += obj.toString().length();
+ } catch (Throwable t) {
+ String logMessageKey = this.getClass().getSimpleName() + "_BYTE_COUNT_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey, "Error calculating byte size. object=" + obj, t, LOG,
+ Level.ERROR);
+ }
+ }
+ latestInputMarkerList.put(outputData.inputMarker.base64FileKey, outputData.inputMarker);
+ localBuffer.add(document);
+ }
+
+ private void addRouterField() {
+ Calendar cal = Calendar.getInstance();
+ int weekDay = cal.get(Calendar.DAY_OF_WEEK);
+ int currHour = cal.get(Calendar.HOUR_OF_DAY);
+ int currMin = cal.get(Calendar.MINUTE);
+
+ int minOfWeek = (weekDay - 1) * 24 * 60 + currHour * 60 + currMin;
+ int slotByMin = minOfWeek / splitInterval % numberOfShards;
+
+ String shard = "shard" + slotByMin;
+
+ if (lastSlotByMin != slotByMin) {
+ LOG.info("Switching to shard " + shard + ", output=" + getShortDescription());
+ lastSlotByMin = slotByMin;
+ }
+
+ for (SolrInputDocument solrInputDocument : localBuffer) {
+ solrInputDocument.addField(ROUTER_FIELD, shard);
+ }
+ }
+
+ private void addToSolr(OutputData outputData) throws SolrServerException, IOException {
+ UpdateResponse response = solrClient.add(localBuffer);
+ if (response.getStatus() != 0) {
+ String logMessageKey = this.getClass().getSimpleName() + "_SOLR_UPDATE_ERROR";
+ LogFeederUtil.logErrorMessageByInterval(logMessageKey,
+ String.format("Error writing to Solr. response=%s, log=%s", response, outputData), null, LOG, Level.ERROR);
+ }
+ statMetric.count += localBuffer.size();
+ writeBytesMetric.count += localBufferBytesSize;
+ for (InputMarker inputMarker : latestInputMarkerList.values()) {
+ inputMarker.input.checkIn(inputMarker);
+ }
+ }
+
+ private void waitForSolr() {
+ while (!isDrain()) {
+ try {
+ LOG.warn(
+ "Solr is down. Going to sleep for " + RETRY_INTERVAL + " seconds. " + "output=" + getShortDescription());
+ Thread.sleep(RETRY_INTERVAL * 1000);
+ } catch (Throwable t) {
+ // ignore
+ break;
+ }
+ if (isDrain()) {
+ break;
+ }
+ try {
+ SolrPingResponse pingResponse = solrClient.ping();
+ if (pingResponse.getStatus() == 0) {
+ LOG.info("Solr seems to be up now. Resuming... output=" + getShortDescription());
+ break;
+ }
+ } catch (Throwable t) {
+ // Ignore
}
}
+ }
+ private void closeSolrClient() {
if (solrClient != null) {
try {
solrClient.close();
@@ -456,14 +430,6 @@ public class OutputSolr extends Output {
// Ignore
}
}
-
- resetLocalBuffer();
- logger.info("Exiting Solr writer thread. output="
- + getShortDescription());
- }
-
- public boolean isDone() {
- return localBuffer.size() == 0;
}
public void resetLocalBuffer() {
@@ -471,5 +437,9 @@ public class OutputSolr extends Output {
localBufferBytesSize = 0;
latestInputMarkerList.clear();
}
+
+ public boolean isDone() {
+ return localBuffer.isEmpty();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
new file mode 100644
index 0000000..9f943ec
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.Input;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class FilterGrokTest {
+ private static final Logger LOG = Logger.getLogger(FilterGrokTest.class);
+
+ private FilterGrok filterGrok;
+ private OutputMgr mockOutputMgr;
+ private Capture<Map<String, Object>> capture;
+
+ public void init(Map<String, Object> config) throws Exception {
+ mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ capture = EasyMock.newCapture(CaptureType.LAST);
+
+ filterGrok = new FilterGrok();
+ filterGrok.loadConfig(config);
+ filterGrok.setOutputMgr(mockOutputMgr);
+ filterGrok.setInput(EasyMock.mock(Input.class));
+ filterGrok.init();
+ }
+
+ @Test
+ public void testFilterGrok_parseMessage() throws Exception {
+ LOG.info("testFilterGrok_parseMessage()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("message_pattern",
+ "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+ init(config);
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertNotNull(jsonParams);
+ assertEquals("Incorrect parsing: log time", "2016-04-08 15:55:23,548", jsonParams.remove("logtime"));
+ assertEquals("Incorrect parsing: log level", "INFO", jsonParams.remove("level"));
+ assertEquals("Incorrect parsing: log message", "This is a test", jsonParams.remove("log_message"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @Test
+ public void testFilterGrok_parseMultiLineMessage() throws Exception {
+ LOG.info("testFilterGrok_parseMultiLineMessage()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("message_pattern",
+ "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+ init(config);
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ String multiLineMessage = "This is a multiline test message\r\n" + "having multiple lines\r\n"
+ + "as one may expect";
+ String[] messageLines = multiLineMessage.split("\r\n");
+ for (int i = 0; i < messageLines.length; i++)
+ filterGrok.apply((i == 0 ? "2016-04-08 15:55:23,548 INFO " : "") + messageLines[i], new InputMarker());
+ filterGrok.flush();
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertNotNull(jsonParams);
+ assertEquals("Incorrect parsing: log time", "2016-04-08 15:55:23,548", jsonParams.remove("logtime"));
+ assertEquals("Incorrect parsing: log level", "INFO", jsonParams.remove("level"));
+ assertEquals("Incorrect parsing: log message", multiLineMessage, jsonParams.remove("log_message"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @Test
+ public void testFilterGrok_notMatchingMesagePattern() throws Exception {
+ LOG.info("testFilterGrok_notMatchingMesagePattern()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("message_pattern",
+ "(?m)^%{TIMESTAMP_ISO8601:logtime}%{SPACE}%{LOGLEVEL:level}%{SPACE}%{GREEDYDATA:log_message}");
+ config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+ init(config);
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(mockOutputMgr);
+
+ filterGrok.apply("04/08/2016 15:55:23,548 INFO This is a test", new InputMarker());
+ filterGrok.apply("04/08/2016 15:55:24,548 WARN Next message", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ assertFalse("Something was captured!", capture.hasCaptured());
+ }
+
+ @Test
+ public void testFilterGrok_noMesagePattern() throws Exception {
+ LOG.info("testFilterGrok_noMesagePattern()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("multiline_pattern", "^(%{TIMESTAMP_ISO8601:logtime})");
+ init(config);
+
+ EasyMock.replay(mockOutputMgr);
+
+ filterGrok.apply("2016-04-08 15:55:23,548 INFO This is a test", new InputMarker());
+ filterGrok.apply("2016-04-08 15:55:24,548 WARN Next message", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ assertFalse("Something was captured", capture.hasCaptured());
+ }
+
+ @After
+ public void cleanUp() {
+ capture.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
new file mode 100644
index 0000000..58db8f2
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class FilterKeyValueTest {
+ private static final Logger LOG = Logger.getLogger(FilterKeyValueTest.class);
+
+ private FilterKeyValue filterKeyValue;
+ private OutputMgr mockOutputMgr;
+ private Capture<Map<String, Object>> capture;
+
+ public void init(Map<String, Object> config) throws Exception {
+ mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ capture = EasyMock.newCapture(CaptureType.LAST);
+
+ filterKeyValue = new FilterKeyValue();
+ filterKeyValue.loadConfig(config);
+ filterKeyValue.setOutputMgr(mockOutputMgr);
+ filterKeyValue.init();
+ }
+
+ @Test
+ public void testFilterKeyValue_extraction() throws Exception {
+ LOG.info("testFilterKeyValue_extraction()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("source_field", "keyValueField");
+ config.put("field_split", "&");
+ // using default value split:
+ init(config);
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("keyValueField"));
+ assertEquals("Incorrect extraction: name1", "value1", jsonParams.remove("name1"));
+ assertEquals("Incorrect extraction: name2", "value2", jsonParams.remove("name2"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @Test
+ public void testFilterKeyValue_missingSourceField() throws Exception {
+ LOG.info("testFilterKeyValue_missingSourceField()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("field_split", "&");
+ // using default value split: =
+ init(config);
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(mockOutputMgr);
+
+ filterKeyValue.apply("{ keyValueField: 'name1=value1&name2=value2' }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ assertFalse("Something was captured!", capture.hasCaptured());
+ }
+
+ @Test
+ public void testFilterKeyValue_noSourceFieldPresent() throws Exception {
+ LOG.info("testFilterKeyValue_noSourceFieldPresent()");
+
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("source_field", "keyValueField");
+ config.put("field_split", "&");
+ init(config);
+
+ // using default value split: =
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(mockOutputMgr);
+
+ filterKeyValue.apply("{ otherField: 'name1=value1&name2=value2' }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertEquals("Original missing!", "name1=value1&name2=value2", jsonParams.remove("otherField"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @After
+ public void cleanUp() {
+ capture.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
new file mode 100644
index 0000000..ebfd0f5
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/JSONFilterCodeTest.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.filter;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.ambari.logfeeder.OutputMgr;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.log4j.Logger;
+import org.easymock.Capture;
+import org.easymock.CaptureType;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class JSONFilterCodeTest {
+ private static final Logger LOG = Logger.getLogger(JSONFilterCodeTest.class);
+
+ private JSONFilterCode jsonFilterCode;
+ private OutputMgr mockOutputMgr;
+ private Capture<Map<String, Object>> capture;
+
+ public void init(Map<String, Object> params) throws Exception {
+ mockOutputMgr = EasyMock.strictMock(OutputMgr.class);
+ capture = EasyMock.newCapture(CaptureType.LAST);
+
+ jsonFilterCode = new JSONFilterCode();
+ jsonFilterCode.loadConfig(params);
+ jsonFilterCode.setOutputMgr(mockOutputMgr);
+ jsonFilterCode.init();
+ }
+
+ @Test
+ public void testJSONFilterCode_convertFields() throws Exception {
+ LOG.info("testJSONFilterCode_convertFields()");
+
+ init(new HashMap<String, Object>());
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ Date d = new Date();
+ String dateString = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).format(d);
+ jsonFilterCode.apply("{ logtime: '" + d.getTime() + "', line_number: 100 }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+ assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @Test
+ public void testJSONFilterCode_logTimeOnly() throws Exception {
+ LOG.info("testJSONFilterCode_logTimeOnly()");
+
+ init(new HashMap<String, Object>());
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ Date d = new Date();
+ String dateString = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).format(d);
+ jsonFilterCode.apply("{ logtime: '" + d.getTime() + "', some_field: 'abc' }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertEquals("Incorrect decoding: log time", dateString, jsonParams.remove("logtime"));
+ assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @Test
+ public void testJSONFilterCode_lineNumberOnly() throws Exception {
+ LOG.info("testJSONFilterCode_lineNumberOnly()");
+
+ init(new HashMap<String, Object>());
+
+ mockOutputMgr.write(EasyMock.capture(capture), EasyMock.anyObject(InputMarker.class));
+ EasyMock.expectLastCall();
+ EasyMock.replay(mockOutputMgr);
+
+ jsonFilterCode.apply("{ line_number: 100, some_field: 'abc' }", new InputMarker());
+
+ EasyMock.verify(mockOutputMgr);
+ Map<String, Object> jsonParams = capture.getValue();
+
+ assertEquals("Incorrect decoding: line number", 100l, jsonParams.remove("line_number"));
+ assertEquals("Incorrect decoding: some field", "abc", jsonParams.remove("some_field"));
+ assertTrue("jsonParams are not empty!", jsonParams.isEmpty());
+ }
+
+ @After
+ public void cleanUp() {
+ capture.reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
new file mode 100644
index 0000000..2242a83
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.input;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.InputMgr;
+import org.apache.ambari.logfeeder.filter.Filter;
+import org.apache.ambari.logfeeder.input.InputMarker;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class InputFileTest {
+ private static final Logger LOG = Logger.getLogger(InputFileTest.class);
+
+ private static final String TEST_DIR_NAME = "/logfeeder_test_dir/";
+ private static final File TEST_DIR = new File(FileUtils.getTempDirectoryPath() + TEST_DIR_NAME);
+
+ private static final String TEST_LOG_FILE_CONTENT = "2016-03-10 14:09:38,278 INFO datanode.DataNode (DataNode.java:<init>(418)) - File descriptor passing is enabled.\n"
+ + "2016-03-10 14:09:38,278 INFO datanode.DataNode (DataNode.java:<init>(429)) - Configured hostname is c6401.ambari.apache.org\n"
+ + "2016-03-10 14:09:38,294 INFO datanode.DataNode (DataNode.java:startDataNode(1127)) - Starting DataNode with maxLockedMemory = 0\n"
+ + "2016-03-10 14:09:38,340 INFO datanode.DataNode (DataNode.java:initDataXceiver(921)) - Opened streaming server at /0.0.0.0:50010\n"
+ + "2016-03-10 14:09:38,343 INFO datanode.DataNode (DataXceiverServer.java:<init>(76)) - Balancing bandwith is 6250000 bytes/s\n"
+ + "2016-03-10 14:09:38,343 INFO datanode.DataNode (DataXceiverServer.java:<init>(77)) - Number threads for balancing is 5\n"
+ + "2016-03-10 14:09:38,345 INFO datanode.DataNode (DataXceiverServer.java:<init>(76)) - Balancing bandwith is 6250000 bytes/s\n"
+ + "2016-03-10 14:09:38,346 INFO datanode.DataNode (DataXceiverServer.java:<init>(77)) - Number threads for balancing is 5\n";
+
+ private static final String[] TEST_LOG_FILE_ROWS = TEST_LOG_FILE_CONTENT.split("\n");
+ private InputFile inputFile;
+ private List<String> rows = new ArrayList<>();
+
+ private InputMarker testInputMarker;
+
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @BeforeClass
+ public static void initDir() throws IOException {
+ if (!TEST_DIR.exists()) {
+ TEST_DIR.mkdir();
+ }
+ FileUtils.cleanDirectory(TEST_DIR);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ }
+
+ public void init(String path) throws Exception {
+ Map<String, Object> config = new HashMap<String, Object>();
+ config.put("source", "file");
+ config.put("tail", "true");
+ config.put("gen_event_md5", "true");
+ config.put("start_position", "beginning");
+
+ config.put("type", "hdfs_datanode");
+ config.put("rowtype", "service");
+ config.put("path", path);
+
+ Filter capture = new Filter() {
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void apply(String inputStr, InputMarker inputMarker) {
+ rows.add(inputStr);
+ if (rows.size() % 3 == 0)
+ inputFile.setDrain(true);
+
+ testInputMarker = inputMarker;
+ }
+ };
+
+ inputFile = new InputFile();
+ inputFile.loadConfig(config);
+ inputFile.setFirstFilter(capture);
+ inputFile.init();
+ }
+
+ @Test
+ public void testInputFile_process3Rows() throws Exception {
+ LOG.info("testInputFile_process3Rows()");
+
+ File checkPointDir = createCheckpointDir("process3_checkpoint");
+ File testFile = createFile("process3.log");
+
+ init(testFile.getAbsolutePath());
+
+ InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
+ EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir);
+ EasyMock.replay(inputMgr);
+ inputFile.setInputMgr(inputMgr);
+
+ inputFile.isReady();
+ inputFile.start();
+
+ assertEquals("Amount of the rows is incorrect", rows.size(), 3);
+ for (int row = 0; row < 3; row++)
+ assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
+
+ EasyMock.verify(inputMgr);
+ }
+
+ @Test
+ public void testInputFile_process6RowsInterrupted() throws Exception {
+ LOG.info("testInputFile_process6RowsInterrupted()");
+
+ File checkPointDir = createCheckpointDir("process6_checkpoint");
+ File testFile = createFile("process6.log");
+ init(testFile.getAbsolutePath());
+
+ InputMgr inputMgr = EasyMock.createStrictMock(InputMgr.class);
+ EasyMock.expect(inputMgr.getCheckPointFolderFile()).andReturn(checkPointDir).times(2);
+ EasyMock.replay(inputMgr);
+ inputFile.setInputMgr(inputMgr);
+
+ inputFile.isReady();
+ inputFile.start();
+ inputFile.checkIn(testInputMarker);
+ inputFile.setDrain(false);
+ inputFile.start();
+
+ assertEquals("Amount of the rows is incorrect", rows.size(), 6);
+ for (int row = 0; row < 6; row++)
+ assertEquals("Row #" + (row + 1) + " not correct", TEST_LOG_FILE_ROWS[row], rows.get(row));
+
+ EasyMock.verify(inputMgr);
+ }
+
+ @Test
+ public void testInputFile_noLogPath() throws Exception {
+ LOG.info("testInputFile_noLogPath()");
+
+ expectedException.expect(NullPointerException.class);
+
+ init(null);
+ inputFile.isReady();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ rows.clear();
+ }
+
+ @AfterClass
+ public static void cleanUp() throws Exception {
+ FileUtils.deleteDirectory(TEST_DIR);
+ }
+
+ private File createFile(String filename) throws IOException {
+ File newFile = new File(FileUtils.getTempDirectoryPath() + TEST_DIR_NAME + filename);
+ FileUtils.writeStringToFile(newFile, TEST_LOG_FILE_CONTENT);
+ return newFile;
+ }
+
+ private File createCheckpointDir(String dirname) {
+ File newDir = new File(TEST_DIR + "/" + dirname);
+ if (!newDir.exists()) {
+ newDir.mkdir();
+ }
+ return newDir;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
new file mode 100644
index 0000000..2df03bd
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.ambari.logfeeder.LogFeederUtil;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperDateTest {
+ private static final Logger LOG = Logger.getLogger(MapperDateTest.class);
+
+ @Test
+ public void testMapperDate_epoch() {
+ LOG.info("testMapperDate_epoch()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("date_pattern", "epoch");
+
+ MapperDate mapperDate = new MapperDate();
+ assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+
+ Date d = DateUtils.truncate(new Date(), Calendar.SECOND);
+ Object mappedValue = mapperDate.apply(jsonObj, Long.toString(d.getTime() / 1000));
+
+ assertEquals("Value wasn't matched properly", d, mappedValue);
+ assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+
+ @Test
+ public void testMapperDate_pattern() throws Exception {
+ LOG.info("testMapperDate_pattern()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("date_pattern", LogFeederUtil.DATE_FORMAT);
+
+ MapperDate mapperDate = new MapperDate();
+ assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+ String dateString = "2016-04-08 15:55:23.548";
+ Object mappedValue = mapperDate.apply(jsonObj, dateString);
+
+ Date d = new SimpleDateFormat(LogFeederUtil.DATE_FORMAT).parse(dateString);
+
+ assertEquals("Value wasn't matched properly", d, mappedValue);
+ assertEquals("Value wasn't put into jsonObj", d, jsonObj.remove("someField"));
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+
+ @Test
+ public void testMapperDate_configNotMap() {
+ LOG.info("testMapperDate_configNotMap()");
+
+ MapperDate mapperDate = new MapperDate();
+ assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, ""));
+ }
+
+ @Test
+ public void testMapperDate_noDatePattern() {
+ LOG.info("testMapperDate_noDatePattern()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("some_param", "some_value");
+
+ MapperDate mapperDate = new MapperDate();
+ assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+ }
+
+ @Test
+ public void testMapperDate_notParsableDatePattern() {
+ LOG.info("testMapperDate_notParsableDatePattern()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("date_pattern", "not_parsable_content");
+
+ MapperDate mapperDate = new MapperDate();
+ assertFalse("Was able to initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+ }
+
+ @Test
+ public void testMapperDate_invalidEpochValue() {
+ LOG.info("testMapperDate_invalidEpochValue()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("date_pattern", "epoch");
+
+ MapperDate mapperDate = new MapperDate();
+ assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+ String invalidValue = "abc";
+ Object mappedValue = mapperDate.apply(jsonObj, invalidValue);
+
+ assertEquals("Invalid value wasn't returned as it is", invalidValue, mappedValue);
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+
+ @Test
+ public void testMapperDate_invalidDateStringValue() {
+ LOG.info("testMapperDate_invalidDateStringValue()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("date_pattern", LogFeederUtil.DATE_FORMAT);
+
+ MapperDate mapperDate = new MapperDate();
+ assertTrue("Could not initialize!", mapperDate.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+ String invalidValue = "abc";
+ Object mappedValue = mapperDate.apply(jsonObj, invalidValue);
+
+ assertEquals("Invalid value wasn't returned as it is", invalidValue, mappedValue);
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
new file mode 100644
index 0000000..6edf766
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldNameTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperFieldNameTest {
+ private static final Logger LOG = Logger.getLogger(MapperFieldNameTest.class);
+
+ @Test
+ public void testMapperFieldName_replaceField() {
+ LOG.info("testMapperFieldName_replaceField()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("new_fieldname", "someOtherField");
+
+ MapperFieldName mapperFieldName = new MapperFieldName();
+ assertTrue("Could not initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+ jsonObj.put("someField", "someValue");
+
+ mapperFieldName.apply(jsonObj, "someOtherValue");
+
+ assertFalse("Old field name wasn't removed", jsonObj.containsKey("someField"));
+ assertEquals("New field wasn't put", "someOtherValue", jsonObj.remove("someOtherField"));
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+
+ @Test
+ public void testMapperFieldName_configNotMap() {
+ LOG.info("testMapperFieldName_configNotMap()");
+
+ MapperFieldName mapperFieldName = new MapperFieldName();
+ assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, ""));
+ }
+
+ @Test
+ public void testMapperFieldName_noNewFieldName() {
+ LOG.info("testMapperFieldName_noNewFieldName()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+
+ MapperFieldName mapperFieldName = new MapperFieldName();
+ assertFalse("Was able to initialize!", mapperFieldName.init(null, "someField", null, mapConfigs));
+ }
+}
http://git-wip-us.apache.org/repos/asf/ambari/blob/e5c13a3a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
----------------------------------------------------------------------
diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
new file mode 100644
index 0000000..df84b8e
--- /dev/null
+++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperFieldValueTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ambari.logfeeder.mapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class MapperFieldValueTest {
+ private static final Logger LOG = Logger.getLogger(MapperFieldValueTest.class);
+
+ @Test
+ public void testMapperFieldValue_replaceValue() {
+ LOG.info("testMapperFieldValue_replaceValue()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("pre_value", "someValue");
+ mapConfigs.put("post_value", "someOtherValue");
+
+ MapperFieldValue mapperFieldValue = new MapperFieldValue();
+ assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+
+ Object mappedValue = mapperFieldValue.apply(jsonObj, "someValue");
+
+ assertEquals("Value wasn't mapped", "someOtherValue", mappedValue);
+ assertEquals("New field wasn't put into jsonObj", "someOtherValue", jsonObj.remove("someField"));
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+
+ @Test
+ public void testMapperFieldValue_configNotMap() {
+ LOG.info("testMapperFieldValue_configNotMap()");
+
+ MapperFieldValue mapperFieldValue = new MapperFieldValue();
+ assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, ""));
+ }
+
+ @Test
+ public void testMapperFieldValue_noPostValue() {
+ LOG.info("testMapperFieldValue_noPostValue()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+
+ MapperFieldValue mapperFieldValue = new MapperFieldValue();
+ assertFalse("Was able to initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+ }
+
+ @Test
+ public void testMapperFieldValue_noPreValueFound() {
+ LOG.info("testMapperFieldValue_noPreValueFound()");
+
+ Map<String, Object> mapConfigs = new HashMap<>();
+ mapConfigs.put("pre_value", "someValue");
+ mapConfigs.put("post_value", "someOtherValue");
+
+ MapperFieldValue mapperFieldValue = new MapperFieldValue();
+ assertTrue("Could not initialize!", mapperFieldValue.init(null, "someField", null, mapConfigs));
+
+ Map<String, Object> jsonObj = new HashMap<>();
+
+ Object mappedValue = mapperFieldValue.apply(jsonObj, "yetAnotherValue");
+
+ assertEquals("Value was mapped", "yetAnotherValue", mappedValue);
+ assertTrue("jsonObj is not empty", jsonObj.isEmpty());
+ }
+}