You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2021/04/01 06:02:02 UTC
[druid] branch master updated: request logs through kafka emitter
(#11036)
This is an automated email from the ASF dual-hosted git repository.
pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new b35486f request logs through kafka emitter (#11036)
b35486f is described below
commit b35486fa81e12dd9d1236b0f5e02ac5f8aff75fd
Author: Parag Jain <pj...@apache.org>
AuthorDate: Thu Apr 1 11:31:32 2021 +0530
request logs through kafka emitter (#11036)
* request logs through kafka emitter
* travis fixes
* review comments
* kafka emitter unit test
* new line
* travis checks
* checkstyle fix
* count request lost when request topic is null
---
.../extensions-contrib/kafka-emitter.md | 1 +
extensions-contrib/kafka-emitter/pom.xml | 6 +
.../apache/druid/emitter/kafka/KafkaEmitter.java | 51 +++++++-
.../druid/emitter/kafka/KafkaEmitterConfig.java | 17 +++
.../emitter/kafka/KafkaEmitterConfigTest.java | 26 +++-
.../druid/emitter/kafka/KafkaEmitterTest.java | 139 +++++++++++++++++++++
.../druid/server/log/DefaultRequestLogEvent.java | 17 ++-
.../server/log/DefaultRequestLogEventTest.java | 82 ++++++++++++
8 files changed, 327 insertions(+), 12 deletions(-)
diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md
index 15c975b..255e83e 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
+|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml
index 4540cba..28ae860 100644
--- a/extensions-contrib/kafka-emitter/pom.xml
+++ b/extensions-contrib/kafka-emitter/pom.xml
@@ -46,6 +46,12 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-server</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index ceb21c3..09b8031 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -21,6 +21,7 @@ package org.apache.druid.emitter.kafka;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
import org.apache.druid.java.util.common.StringUtils;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -51,6 +53,7 @@ public class KafkaEmitter implements Emitter
private static final int DEFAULT_RETRIES = 3;
private final AtomicLong metricLost;
private final AtomicLong alertLost;
+ private final AtomicLong requestLost;
private final AtomicLong invalidLost;
private final KafkaEmitterConfig config;
@@ -58,6 +61,7 @@ public class KafkaEmitter implements Emitter
private final ObjectMapper jsonMapper;
private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
+ private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
private final ScheduledExecutorService scheduler;
public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
@@ -70,9 +74,11 @@ public class KafkaEmitter implements Emitter
.getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
- this.scheduler = Executors.newScheduledThreadPool(3);
+ this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+ this.scheduler = Executors.newScheduledThreadPool(4);
this.metricLost = new AtomicLong(0L);
this.alertLost = new AtomicLong(0L);
+ this.requestLost = new AtomicLong(0L);
this.invalidLost = new AtomicLong(0L);
}
@@ -86,7 +92,8 @@ public class KafkaEmitter implements Emitter
};
}
- private Producer<String, String> setKafkaProducer()
+ @VisibleForTesting
+ protected Producer<String, String> setKafkaProducer()
{
ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
try {
@@ -111,9 +118,13 @@ public class KafkaEmitter implements Emitter
{
scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
+ if (config.getRequestTopic() != null) {
+ scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS);
+ }
scheduler.scheduleWithFixedDelay(() -> {
- log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
- metricLost.get(), alertLost.get(), invalidLost.get());
+ log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
+ metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get()
+ );
}, 5, 5, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
}
@@ -128,7 +139,13 @@ public class KafkaEmitter implements Emitter
sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
}
- private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
+ private void sendRequestToKafka()
+ {
+ sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
+ }
+
+ @VisibleForTesting
+ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
{
ObjectContainer<String> objectToSend;
try {
@@ -166,6 +183,10 @@ public class KafkaEmitter implements Emitter
if (!alertQueue.offer(objectContainer)) {
alertLost.incrementAndGet();
}
+ } else if (event instanceof RequestLogEvent) {
+ if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
+ requestLost.incrementAndGet();
+ }
} else {
invalidLost.incrementAndGet();
}
@@ -189,4 +210,24 @@ public class KafkaEmitter implements Emitter
scheduler.shutdownNow();
producer.close();
}
+
+ public long getMetricLostCount()
+ {
+ return metricLost.get();
+ }
+
+ public long getAlertLostCount()
+ {
+ return alertLost.get();
+ }
+
+ public long getRequestLostCount()
+ {
+ return requestLost.get();
+ }
+
+ public long getInvalidLostCount()
+ {
+ return invalidLost.get();
+ }
}
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index fe71b21..ed7b9ea 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -37,6 +37,8 @@ public class KafkaEmitterConfig
private final String metricTopic;
@JsonProperty("alert.topic")
private final String alertTopic;
+ @Nullable @JsonProperty("request.topic")
+ private final String requestTopic;
@JsonProperty
private final String clusterName;
@JsonProperty("producer.config")
@@ -47,6 +49,7 @@ public class KafkaEmitterConfig
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
@JsonProperty("metric.topic") String metricTopic,
@JsonProperty("alert.topic") String alertTopic,
+ @Nullable @JsonProperty("request.topic") String requestTopic,
@JsonProperty("clusterName") String clusterName,
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
)
@@ -54,6 +57,7 @@ public class KafkaEmitterConfig
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
+ this.requestTopic = requestTopic;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}
@@ -82,6 +86,12 @@ public class KafkaEmitterConfig
return clusterName;
}
+ @Nullable
+ public String getRequestTopic()
+ {
+ return requestTopic;
+ }
+
@JsonProperty
public Map<String, String> getKafkaProducerConfig()
{
@@ -109,6 +119,11 @@ public class KafkaEmitterConfig
if (!getAlertTopic().equals(that.getAlertTopic())) {
return false;
}
+
+ if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) {
+ return false;
+ }
+
if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
return false;
}
@@ -121,6 +136,7 @@ public class KafkaEmitterConfig
int result = getBootstrapServers().hashCode();
result = 31 * result + getMetricTopic().hashCode();
result = 31 * result + getAlertTopic().hashCode();
+ result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
result = 31 * result + getKafkaProducerConfig().hashCode();
return result;
@@ -133,6 +149,7 @@ public class KafkaEmitterConfig
"bootstrap.servers='" + bootstrapServers + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
+ ", request.topic='" + requestTopic + '\'' +
", clusterName='" + clusterName + '\'' +
", Producer.config=" + kafkaProducerConfig +
'}';
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 89e75fc..55ecdba 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -43,13 +43,27 @@ public class KafkaEmitterConfigTest
public void testSerDeserKafkaEmitterConfig() throws IOException
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
- "alertTest", "clusterNameTest",
- ImmutableMap.<String, String>builder()
- .put("testKey", "testValue").build()
+ "alertTest", "requestTest",
+ "clusterNameTest", ImmutableMap.<String, String>builder()
+ .put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
- .readValue(kafkaEmitterConfigString);
+ .readValue(kafkaEmitterConfigString);
+ Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+ }
+
+ @Test
+ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
+ {
+ KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
+ "alertTest", null,
+ "clusterNameTest", ImmutableMap.<String, String>builder()
+ .put("testKey", "testValue").build()
+ );
+ String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
+ KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
+ .readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}
@@ -57,8 +71,8 @@ public class KafkaEmitterConfigTest
public void testSerDeNotRequiredKafkaProducerConfig()
{
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
- "alertTest", "clusterNameTest",
- null
+ "alertTest", null,
+ "clusterNameTest", null
);
try {
@SuppressWarnings("unused")
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
new file mode 100644
index 0000000..26d9701
--- /dev/null
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
+import org.apache.druid.server.log.RequestLogEvent;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@RunWith(Parameterized.class)
+public class KafkaEmitterTest
+{
+ @Parameterized.Parameter
+ public String requestTopic;
+
+ @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+ public static Object[] data()
+ {
+ return new Object[] {
+ "requests",
+ null
+ };
+ }
+
+ // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds
+ @Test(timeout = 15_000)
+ public void testKafkaEmitter() throws InterruptedException
+ {
+ final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
+ ServiceMetricEvent.builder().build("m1", 1).build("service", "host")
+ );
+
+ final List<AlertEvent> alertEvents = ImmutableList.of(
+ new AlertEvent("service", "host", "description")
+ );
+
+ final List<RequestLogEvent> requestLogEvents = ImmutableList.of(
+ DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
+ RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
+ ).build("service", "host")
+ );
+
+ int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
+ int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
+
+ final CountDownLatch countDownSentEvents = new CountDownLatch(
+ requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+ final KafkaProducer<String, String> producer = EasyMock.createStrictMock(KafkaProducer.class);
+ final KafkaEmitter kafkaEmitter = new KafkaEmitter(
+ new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
+ new ObjectMapper()
+ )
+ {
+ @Override
+ protected Producer<String, String> setKafkaProducer()
+ {
+ return producer;
+ }
+
+ @Override
+ protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue,
+ Callback callback
+ )
+ {
+ countDownSentEvents.countDown();
+ super.sendToKafka(topic, recordQueue, callback);
+ }
+ };
+
+ EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null)
+ .times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+ EasyMock.replay(producer);
+ kafkaEmitter.start();
+
+ for (Event event : serviceMetricEvents) {
+ kafkaEmitter.emit(event);
+ }
+ for (Event event : alertEvents) {
+ kafkaEmitter.emit(event);
+ }
+ for (Event event : requestLogEvents) {
+ kafkaEmitter.emit(event);
+ }
+ countDownSentEvents.await();
+
+ Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+ Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
+ Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
+
+ while (true) {
+ try {
+ EasyMock.verify(producer);
+ break;
+ }
+ catch (Throwable e) {
+ // although the latch may have count down, producer.send may not have been called yet in KafkaEmitter
+ // so wait for sometime before verifying the mock
+ Thread.sleep(100);
+ // just continue
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
index 2224e62..4744b2f 100644
--- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
+++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
@@ -28,6 +28,7 @@ import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.joda.time.DateTime;
+import java.util.HashMap;
import java.util.Map;
/**
@@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent
@Override
public Map<String, Object> toMap()
{
- return ImmutableMap.of();
+ final Map<String, Object> map = new HashMap<>();
+ map.put("feed", getFeed());
+ map.put("timestamp", getCreatedTime());
+ map.put("service", getService());
+ map.put("host", getHost());
+ if (getQuery() != null) {
+ map.put("query", getQuery());
+ }
+ if (getSql() != null) {
+ map.put("sql", getSql());
+ map.put("sqlQueryContext", getSqlQueryContext());
+ }
+ map.put("remoteAddr", getRemoteAddr());
+ map.put("queryStats", getQueryStats());
+ return map;
}
@Override
diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
index fe78999..0a515ee 100644
--- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
+++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
@@ -26,15 +26,20 @@ import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Query;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.query.timeseries.TimeseriesQuery;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
+import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.Map;
+
public class DefaultRequestLogEventTest
{
private ObjectMapper objectMapper = new DefaultObjectMapper();
@@ -68,4 +73,81 @@ public class DefaultRequestLogEventTest
String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\ [...]
Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson));
}
+
+ @Test
+ public void testDefaultRequestLogEventToMap()
+ {
+ final String feed = "test";
+ final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+ final String service = "druid-service";
+ final String host = "127.0.0.1";
+ final Query query = new TimeseriesQuery(
+ new TableDataSource("dummy"),
+ new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
+ true,
+ VirtualColumns.EMPTY,
+ null,
+ Granularities.ALL,
+ ImmutableList.of(),
+ ImmutableList.of(),
+ 5,
+ ImmutableMap.of("key", "value"));
+ final QueryStats queryStats = new QueryStats(
+ ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll"));
+ RequestLogLine nativeLine = RequestLogLine.forNative(
+ query,
+ timestamp,
+ host,
+ queryStats
+ );
+
+ DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+ ImmutableMap.of("service", service, "host", host), feed, nativeLine
+ );
+ final Map<String, Object> expected = new HashMap<>();
+ expected.put("feed", feed);
+ expected.put("timestamp", timestamp);
+ expected.put("service", service);
+ expected.put("host", host);
+ expected.put("query", query);
+ expected.put("remoteAddr", host);
+ expected.put("queryStats", queryStats);
+
+ Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+ }
+
+ @Test
+ public void testDefaultRequestLogEventToMapSQL()
+ {
+ final String feed = "test";
+ final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+ final String service = "druid-service";
+ final String host = "127.0.0.1";
+ final String sql = "select * from 1337";
+ final QueryStats queryStats = new QueryStats(
+ ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll"));
+
+ RequestLogLine nativeLine = RequestLogLine.forSql(
+ sql,
+ ImmutableMap.of(),
+ timestamp,
+ host,
+ queryStats
+ );
+
+ DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+ ImmutableMap.of("service", service, "host", host), feed, nativeLine
+ );
+ final Map<String, Object> expected = new HashMap<>();
+ expected.put("feed", feed);
+ expected.put("timestamp", timestamp);
+ expected.put("service", service);
+ expected.put("host", host);
+ expected.put("sql", sql);
+ expected.put("sqlQueryContext", ImmutableMap.of());
+ expected.put("remoteAddr", host);
+ expected.put("queryStats", queryStats);
+
+ Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org