You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by pj...@apache.org on 2021/04/01 06:02:02 UTC

[druid] branch master updated: request logs through kafka emitter (#11036)

This is an automated email from the ASF dual-hosted git repository.

pjain1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b35486f  request logs through kafka emitter (#11036)
b35486f is described below

commit b35486fa81e12dd9d1236b0f5e02ac5f8aff75fd
Author: Parag Jain <pj...@apache.org>
AuthorDate: Thu Apr 1 11:31:32 2021 +0530

    request logs through kafka emitter (#11036)
    
    * request logs through kafka emitter
    
    * travis fixes
    
    * review comments
    
    * kafka emitter unit test
    
    * new line
    
    * travis checks
    
    * checkstyle fix
    
    * count request lost when request topic is null
---
 .../extensions-contrib/kafka-emitter.md            |   1 +
 extensions-contrib/kafka-emitter/pom.xml           |   6 +
 .../apache/druid/emitter/kafka/KafkaEmitter.java   |  51 +++++++-
 .../druid/emitter/kafka/KafkaEmitterConfig.java    |  17 +++
 .../emitter/kafka/KafkaEmitterConfigTest.java      |  26 +++-
 .../druid/emitter/kafka/KafkaEmitterTest.java      | 139 +++++++++++++++++++++
 .../druid/server/log/DefaultRequestLogEvent.java   |  17 ++-
 .../server/log/DefaultRequestLogEventTest.java     |  82 ++++++++++++
 8 files changed, 327 insertions(+), 12 deletions(-)

diff --git a/docs/development/extensions-contrib/kafka-emitter.md b/docs/development/extensions-contrib/kafka-emitter.md
index 15c975b..255e83e 100644
--- a/docs/development/extensions-contrib/kafka-emitter.md
+++ b/docs/development/extensions-contrib/kafka-emitter.md
@@ -41,6 +41,7 @@ All the configuration parameters for the Kafka emitter are under `druid.emitter.
 |`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
 |`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
 |`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
+|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
 |`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
 |`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
 
diff --git a/extensions-contrib/kafka-emitter/pom.xml b/extensions-contrib/kafka-emitter/pom.xml
index 4540cba..28ae860 100644
--- a/extensions-contrib/kafka-emitter/pom.xml
+++ b/extensions-contrib/kafka-emitter/pom.xml
@@ -46,6 +46,12 @@
       <scope>provided</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-server</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
       <scope>provided</scope>
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
index ceb21c3..09b8031 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitter.java
@@ -21,6 +21,7 @@ package org.apache.druid.emitter.kafka;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.emitter.kafka.MemoryBoundLinkedBlockingQueue.ObjectContainer;
 import org.apache.druid.java.util.common.StringUtils;
@@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.core.Emitter;
 import org.apache.druid.java.util.emitter.core.Event;
 import org.apache.druid.java.util.emitter.service.AlertEvent;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.log.RequestLogEvent;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -51,6 +53,7 @@ public class KafkaEmitter implements Emitter
   private static final int DEFAULT_RETRIES = 3;
   private final AtomicLong metricLost;
   private final AtomicLong alertLost;
+  private final AtomicLong requestLost;
   private final AtomicLong invalidLost;
 
   private final KafkaEmitterConfig config;
@@ -58,6 +61,7 @@ public class KafkaEmitter implements Emitter
   private final ObjectMapper jsonMapper;
   private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
   private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
+  private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
   private final ScheduledExecutorService scheduler;
 
   public KafkaEmitter(KafkaEmitterConfig config, ObjectMapper jsonMapper)
@@ -70,9 +74,11 @@ public class KafkaEmitter implements Emitter
                                                       .getOrDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"));
     this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
     this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
-    this.scheduler = Executors.newScheduledThreadPool(3);
+    this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(queueMemoryBound);
+    this.scheduler = Executors.newScheduledThreadPool(4);
     this.metricLost = new AtomicLong(0L);
     this.alertLost = new AtomicLong(0L);
+    this.requestLost = new AtomicLong(0L);
     this.invalidLost = new AtomicLong(0L);
   }
 
@@ -86,7 +92,8 @@ public class KafkaEmitter implements Emitter
     };
   }
 
-  private Producer<String, String> setKafkaProducer()
+  @VisibleForTesting
+  protected Producer<String, String> setKafkaProducer()
   {
     ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
     try {
@@ -111,9 +118,13 @@ public class KafkaEmitter implements Emitter
   {
     scheduler.schedule(this::sendMetricToKafka, 10, TimeUnit.SECONDS);
     scheduler.schedule(this::sendAlertToKafka, 10, TimeUnit.SECONDS);
+    if (config.getRequestTopic() != null) {
+      scheduler.schedule(this::sendRequestToKafka, 10, TimeUnit.SECONDS);
+    }
     scheduler.scheduleWithFixedDelay(() -> {
-      log.info("Message lost counter: metricLost=[%d], alertLost=[%d], invalidLost=[%d]",
-               metricLost.get(), alertLost.get(), invalidLost.get());
+      log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d]",
+          metricLost.get(), alertLost.get(), requestLost.get(), invalidLost.get()
+      );
     }, 5, 5, TimeUnit.MINUTES);
     log.info("Starting Kafka Emitter.");
   }
@@ -128,7 +139,13 @@ public class KafkaEmitter implements Emitter
     sendToKafka(config.getAlertTopic(), alertQueue, setProducerCallback(alertLost));
   }
 
-  private void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
+  private void sendRequestToKafka()
+  {
+    sendToKafka(config.getRequestTopic(), requestQueue, setProducerCallback(requestLost));
+  }
+
+  @VisibleForTesting
+  protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue, Callback callback)
   {
     ObjectContainer<String> objectToSend;
     try {
@@ -166,6 +183,10 @@ public class KafkaEmitter implements Emitter
           if (!alertQueue.offer(objectContainer)) {
             alertLost.incrementAndGet();
           }
+        } else if (event instanceof RequestLogEvent) {
+          if (config.getRequestTopic() == null || !requestQueue.offer(objectContainer)) {
+            requestLost.incrementAndGet();
+          }
         } else {
           invalidLost.incrementAndGet();
         }
@@ -189,4 +210,24 @@ public class KafkaEmitter implements Emitter
     scheduler.shutdownNow();
     producer.close();
   }
+
+  public long getMetricLostCount()
+  {
+    return metricLost.get();
+  }
+
+  public long getAlertLostCount()
+  {
+    return alertLost.get();
+  }
+
+  public long getRequestLostCount()
+  {
+    return requestLost.get();
+  }
+
+  public long getInvalidLostCount()
+  {
+    return invalidLost.get();
+  }
 }
diff --git a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
index fe71b21..ed7b9ea 100644
--- a/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
+++ b/extensions-contrib/kafka-emitter/src/main/java/org/apache/druid/emitter/kafka/KafkaEmitterConfig.java
@@ -37,6 +37,8 @@ public class KafkaEmitterConfig
   private final String metricTopic;
   @JsonProperty("alert.topic")
   private final String alertTopic;
+  @Nullable @JsonProperty("request.topic")
+  private final String requestTopic;
   @JsonProperty
   private final String clusterName;
   @JsonProperty("producer.config")
@@ -47,6 +49,7 @@ public class KafkaEmitterConfig
       @JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) String bootstrapServers,
       @JsonProperty("metric.topic") String metricTopic,
       @JsonProperty("alert.topic") String alertTopic,
+      @Nullable @JsonProperty("request.topic") String requestTopic,
       @JsonProperty("clusterName") String clusterName,
       @JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
   )
@@ -54,6 +57,7 @@ public class KafkaEmitterConfig
     this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
     this.metricTopic = Preconditions.checkNotNull(metricTopic, "metric.topic can not be null");
     this.alertTopic = Preconditions.checkNotNull(alertTopic, "alert.topic can not be null");
+    this.requestTopic = requestTopic;
     this.clusterName = clusterName;
     this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
   }
@@ -82,6 +86,12 @@ public class KafkaEmitterConfig
     return clusterName;
   }
 
+  @Nullable
+  public String getRequestTopic()
+  {
+    return requestTopic;
+  }
+
   @JsonProperty
   public Map<String, String> getKafkaProducerConfig()
   {
@@ -109,6 +119,11 @@ public class KafkaEmitterConfig
     if (!getAlertTopic().equals(that.getAlertTopic())) {
       return false;
     }
+
+    if (getRequestTopic() != null ? !getRequestTopic().equals(that.getRequestTopic()) : that.getRequestTopic() != null) {
+      return false;
+    }
+
     if (getClusterName() != null ? !getClusterName().equals(that.getClusterName()) : that.getClusterName() != null) {
       return false;
     }
@@ -121,6 +136,7 @@ public class KafkaEmitterConfig
     int result = getBootstrapServers().hashCode();
     result = 31 * result + getMetricTopic().hashCode();
     result = 31 * result + getAlertTopic().hashCode();
+    result = 31 * result + (getRequestTopic() != null ? getRequestTopic().hashCode() : 0);
     result = 31 * result + (getClusterName() != null ? getClusterName().hashCode() : 0);
     result = 31 * result + getKafkaProducerConfig().hashCode();
     return result;
@@ -133,6 +149,7 @@ public class KafkaEmitterConfig
            "bootstrap.servers='" + bootstrapServers + '\'' +
            ", metric.topic='" + metricTopic + '\'' +
            ", alert.topic='" + alertTopic + '\'' +
+           ", request.topic='" + requestTopic + '\'' +
            ", clusterName='" + clusterName + '\'' +
            ", Producer.config=" + kafkaProducerConfig +
            '}';
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
index 89e75fc..55ecdba 100644
--- a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterConfigTest.java
@@ -43,13 +43,27 @@ public class KafkaEmitterConfigTest
   public void testSerDeserKafkaEmitterConfig() throws IOException
   {
     KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
-                                                                   "alertTest", "clusterNameTest",
-                                                                   ImmutableMap.<String, String>builder()
-                                                                       .put("testKey", "testValue").build()
+        "alertTest", "requestTest",
+        "clusterNameTest", ImmutableMap.<String, String>builder()
+        .put("testKey", "testValue").build()
     );
     String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
     KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
-                                                          .readValue(kafkaEmitterConfigString);
+        .readValue(kafkaEmitterConfigString);
+    Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
+  }
+
+  @Test
+  public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
+  {
+    KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", "metricTest",
+        "alertTest", null,
+        "clusterNameTest", ImmutableMap.<String, String>builder()
+        .put("testKey", "testValue").build()
+    );
+    String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
+    KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
+        .readValue(kafkaEmitterConfigString);
     Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
   }
 
@@ -57,8 +71,8 @@ public class KafkaEmitterConfigTest
   public void testSerDeNotRequiredKafkaProducerConfig()
   {
     KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("localhost:9092", "metricTest",
-                                                                   "alertTest", "clusterNameTest",
-                                                                   null
+        "alertTest", null,
+        "clusterNameTest", null
     );
     try {
       @SuppressWarnings("unused")
diff --git a/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
new file mode 100644
index 0000000..26d9701
--- /dev/null
+++ b/extensions-contrib/kafka-emitter/src/test/java/org/apache/druid/emitter/kafka/KafkaEmitterTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.emitter.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.emitter.core.Event;
+import org.apache.druid.java.util.emitter.service.AlertEvent;
+import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import org.apache.druid.server.QueryStats;
+import org.apache.druid.server.RequestLogLine;
+import org.apache.druid.server.log.DefaultRequestLogEventBuilderFactory;
+import org.apache.druid.server.log.RequestLogEvent;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.easymock.EasyMock;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+@RunWith(Parameterized.class)
+public class KafkaEmitterTest
+{
+  @Parameterized.Parameter
+  public String requestTopic;
+
+  @Parameterized.Parameters(name = "{index}: requestTopic - {0}")
+  public static Object[] data()
+  {
+    return new Object[] {
+        "requests",
+        null
+    };
+  }
+
+  // there is 10 seconds wait in kafka emitter before it starts sending events to broker, so set a timeout for 15 seconds
+  @Test(timeout = 15_000)
+  public void testKafkaEmitter() throws InterruptedException
+  {
+    final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
+        ServiceMetricEvent.builder().build("m1", 1).build("service", "host")
+    );
+
+    final List<AlertEvent> alertEvents = ImmutableList.of(
+        new AlertEvent("service", "host", "description")
+    );
+
+    final List<RequestLogEvent> requestLogEvents = ImmutableList.of(
+        DefaultRequestLogEventBuilderFactory.instance().createRequestLogEventBuilder("requests",
+            RequestLogLine.forSql("", null, DateTimes.nowUtc(), null, new QueryStats(ImmutableMap.of()))
+        ).build("service", "host")
+    );
+
+    int totalEvents = serviceMetricEvents.size() + alertEvents.size() + requestLogEvents.size();
+    int totalEventsExcludingRequestLogEvents = totalEvents - requestLogEvents.size();
+
+    final CountDownLatch countDownSentEvents = new CountDownLatch(
+        requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+    final KafkaProducer<String, String> producer = EasyMock.createStrictMock(KafkaProducer.class);
+    final KafkaEmitter kafkaEmitter = new KafkaEmitter(
+        new KafkaEmitterConfig("", "metrics", "alerts", requestTopic, "test-cluster", null),
+        new ObjectMapper()
+    )
+    {
+      @Override
+      protected Producer<String, String> setKafkaProducer()
+      {
+        return producer;
+      }
+
+      @Override
+      protected void sendToKafka(final String topic, MemoryBoundLinkedBlockingQueue<String> recordQueue,
+          Callback callback
+      )
+      {
+        countDownSentEvents.countDown();
+        super.sendToKafka(topic, recordQueue, callback);
+      }
+    };
+
+    EasyMock.expect(producer.send(EasyMock.anyObject(), EasyMock.anyObject())).andReturn(null)
+        .times(requestTopic == null ? totalEventsExcludingRequestLogEvents : totalEvents);
+    EasyMock.replay(producer);
+    kafkaEmitter.start();
+
+    for (Event event : serviceMetricEvents) {
+      kafkaEmitter.emit(event);
+    }
+    for (Event event : alertEvents) {
+      kafkaEmitter.emit(event);
+    }
+    for (Event event : requestLogEvents) {
+      kafkaEmitter.emit(event);
+    }
+    countDownSentEvents.await();
+
+    Assert.assertEquals(0, kafkaEmitter.getMetricLostCount());
+    Assert.assertEquals(0, kafkaEmitter.getAlertLostCount());
+    Assert.assertEquals(requestTopic == null ? requestLogEvents.size() : 0, kafkaEmitter.getRequestLostCount());
+    Assert.assertEquals(0, kafkaEmitter.getInvalidLostCount());
+
+    while (true) {
+      try {
+        EasyMock.verify(producer);
+        break;
+      }
+      catch (Throwable e) {
+        // although the latch may have count down, producer.send may not have been called yet in KafkaEmitter
+        // so wait for sometime before verifying the mock
+        Thread.sleep(100);
+        // just continue
+      }
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
index 2224e62..4744b2f 100644
--- a/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
+++ b/server/src/main/java/org/apache/druid/server/log/DefaultRequestLogEvent.java
@@ -28,6 +28,7 @@ import org.apache.druid.server.QueryStats;
 import org.apache.druid.server.RequestLogLine;
 import org.joda.time.DateTime;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -56,7 +57,21 @@ public final class DefaultRequestLogEvent implements RequestLogEvent
   @Override
   public Map<String, Object> toMap()
   {
-    return ImmutableMap.of();
+    final Map<String, Object> map = new HashMap<>();
+    map.put("feed", getFeed());
+    map.put("timestamp", getCreatedTime());
+    map.put("service", getService());
+    map.put("host", getHost());
+    if (getQuery() != null) {
+      map.put("query", getQuery());
+    }
+    if (getSql() != null) {
+      map.put("sql", getSql());
+      map.put("sqlQueryContext", getSqlQueryContext());
+    }
+    map.put("remoteAddr", getRemoteAddr());
+    map.put("queryStats", getQueryStats());
+    return map;
   }
 
   @Override
diff --git a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
index fe78999..0a515ee 100644
--- a/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
+++ b/server/src/test/java/org/apache/druid/server/log/DefaultRequestLogEventTest.java
@@ -26,15 +26,20 @@ import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.Query;
 import org.apache.druid.query.TableDataSource;
 import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
 import org.apache.druid.query.timeseries.TimeseriesQuery;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.server.QueryStats;
 import org.apache.druid.server.RequestLogLine;
+import org.joda.time.DateTime;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.Map;
+
 public class DefaultRequestLogEventTest
 {
   private ObjectMapper objectMapper = new DefaultObjectMapper();
@@ -68,4 +73,81 @@ public class DefaultRequestLogEventTest
     String expected = "{\"feed\":\"feed\",\"query\":{\"queryType\":\"timeseries\",\"dataSource\":{\"type\":\"table\",\"name\":\"dummy\"},\"intervals\":{\"type\":\"intervals\",\"intervals\":[\"2015-01-01T00:00:00.000Z/2015-01-02T00:00:00.000Z\"]},\"descending\":true,\"virtualColumns\":[],\"filter\":null,\"granularity\":{\"type\":\"all\"},\"aggregations\":[],\"postAggregations\":[],\"limit\":5,\"context\":{\"key\":\"value\"}},\"host\":\"127.0.0.1\",\"timestamp\":\"2019-12-12T03:01:00.000Z\ [...]
     Assert.assertEquals(objectMapper.readTree(expected), objectMapper.readTree(logEventJson));
   }
+
+  @Test
+  public void testDefaultRequestLogEventToMap()
+  {
+    final String feed = "test";
+    final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+    final String service = "druid-service";
+    final String host = "127.0.0.1";
+    final Query query = new TimeseriesQuery(
+        new TableDataSource("dummy"),
+        new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("2015-01-01/2015-01-02"))),
+        true,
+        VirtualColumns.EMPTY,
+        null,
+        Granularities.ALL,
+        ImmutableList.of(),
+        ImmutableList.of(),
+        5,
+        ImmutableMap.of("key", "value"));
+    final QueryStats queryStats = new QueryStats(
+        ImmutableMap.of("query/time", 13L, "query/bytes", 10L, "success", true, "identity", "allowAll"));
+    RequestLogLine nativeLine = RequestLogLine.forNative(
+        query,
+        timestamp,
+        host,
+        queryStats
+    );
+
+    DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+        ImmutableMap.of("service", service, "host", host), feed, nativeLine
+    );
+    final Map<String, Object> expected = new HashMap<>();
+    expected.put("feed", feed);
+    expected.put("timestamp", timestamp);
+    expected.put("service", service);
+    expected.put("host", host);
+    expected.put("query", query);
+    expected.put("remoteAddr", host);
+    expected.put("queryStats", queryStats);
+
+    Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+  }
+
+  @Test
+  public void testDefaultRequestLogEventToMapSQL()
+  {
+    final String feed = "test";
+    final DateTime timestamp = DateTimes.of(2019, 12, 12, 3, 1);
+    final String service = "druid-service";
+    final String host = "127.0.0.1";
+    final String sql = "select * from 1337";
+    final QueryStats queryStats = new QueryStats(
+        ImmutableMap.of("sqlQuery/time", 13L, "sqlQuery/bytes", 10L, "success", true, "identity", "allowAll"));
+
+    RequestLogLine nativeLine = RequestLogLine.forSql(
+        sql,
+        ImmutableMap.of(),
+        timestamp,
+        host,
+        queryStats
+    );
+
+    DefaultRequestLogEvent defaultRequestLogEvent = new DefaultRequestLogEvent(
+        ImmutableMap.of("service", service, "host", host), feed, nativeLine
+    );
+    final Map<String, Object> expected = new HashMap<>();
+    expected.put("feed", feed);
+    expected.put("timestamp", timestamp);
+    expected.put("service", service);
+    expected.put("host", host);
+    expected.put("sql", sql);
+    expected.put("sqlQueryContext", ImmutableMap.of());
+    expected.put("remoteAddr", host);
+    expected.put("queryStats", queryStats);
+
+    Assert.assertEquals(expected, defaultRequestLogEvent.toMap());
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org