You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ma...@apache.org on 2021/04/14 03:19:00 UTC

[druid] branch master updated: Introduce a new configuration that skip storing audit payload if payload size exceed limit and skip storing null fields for audit payload (#11078)

This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f968400  Introduce a new configuration that skip storing audit payload if payload size exceed limit and skip storing null fields for audit payload (#11078)
f968400 is described below

commit f96840017017806d2bbe35cf344e682995e8a9c1
Author: Maytas Monsereenusorn <ma...@apache.org>
AuthorDate: Tue Apr 13 20:18:28 2021 -0700

    Introduce a new configuration that skip storing audit payload if payload size exceed limit and skip storing null fields for audit payload (#11078)
    
    * Add config to skip storing audit payload if exceed limit
    
    * fix checkstyle
    
    * change config name
    
    * skip null fields for audit payload
    
    * fix checkstyle
    
    * address comments
    
    * fix guice
    
    * fix test
    
    * add tests
    
    * address comments
    
    * address comments
    
    * address comments
    
    * fix checkstyle
    
    * address comments
    
    * fix test
    
    * fix test
    
    * address comments
    
    * Address comments
    
    Co-authored-by: Jihoon Son <ji...@apache.org>
    
    Co-authored-by: Jihoon Son <ji...@apache.org>
---
 .../java/org/apache/druid/audit/AuditManager.java  |  17 +-
 .../apache/druid/common/config/ConfigSerde.java    |  10 +-
 .../druid/common/config/JacksonConfigManager.java  |  32 +-
 .../apache/druid/guice/DruidSecondaryModule.java   |  11 +
 .../druid/guice/annotations/JsonNonNull.java       |  31 +-
 .../common/config/JacksonConfigManagerTest.java    | 178 +++++++++++
 docs/configuration/index.md                        |   2 +
 .../org/apache/druid/jackson/JacksonModule.java    |  11 +
 .../apache/druid/server/audit/SQLAuditManager.java |  27 +-
 .../druid/server/audit/SQLAuditManagerConfig.java  |  23 ++
 .../druid/server/audit/SQLAuditManagerTest.java    | 346 ++++++++++++++++-----
 11 files changed, 562 insertions(+), 126 deletions(-)

diff --git a/core/src/main/java/org/apache/druid/audit/AuditManager.java b/core/src/main/java/org/apache/druid/audit/AuditManager.java
index 6389350..73804d7 100644
--- a/core/src/main/java/org/apache/druid/audit/AuditManager.java
+++ b/core/src/main/java/org/apache/druid/audit/AuditManager.java
@@ -20,6 +20,7 @@
 package org.apache.druid.audit;
 
 
+import org.apache.druid.common.config.ConfigSerde;
 import org.joda.time.Interval;
 import org.skife.jdbi.v2.Handle;
 
@@ -28,15 +29,25 @@ import java.util.List;
 
 public interface AuditManager
 {
+  /**
+   * This String is the default message stored instead of the actual audit payload if the audit payload size
+   * exceeded the maximum size limit configuration
+   */
+  String PAYLOAD_SKIP_MSG_FORMAT = "Payload was not stored as its size exceeds the limit [%d] configured by druid.audit.manager.maxPayloadSizeBytes";
+
   String X_DRUID_AUTHOR = "X-Druid-Author";
 
   String X_DRUID_COMMENT = "X-Druid-Comment";
 
   /**
-   * inserts an audit Entry in the Audit Table
-   * @param auditEntry
+   * inserts an audit entry in the Audit Table
+   * @param key of the audit entry
+   * @param type of the audit entry
+   * @param auditInfo of the audit entry
+   * @param payload of the audit entry
+   * @param configSerde of the payload of the audit entry
    */
-  void doAudit(AuditEntry auditEntry);
+  <T> void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde<T> configSerde);
 
   /**
    * inserts an audit Entry in audit table using the handler provided
diff --git a/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java b/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java
index 119c0e5a..708d16d 100644
--- a/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java
+++ b/core/src/main/java/org/apache/druid/common/config/ConfigSerde.java
@@ -24,6 +24,14 @@ package org.apache.druid.common.config;
 public interface ConfigSerde<T>
 {
   byte[] serialize(T obj);
-  String serializeToString(T obj);
+  /**
+   * Serialize object to String
+   *
+   * @param obj to be serialize
+   * @param skipNull if true, then skip serialization of any field with null value.
+   *                 This can be used to reduce the size of the resulting String.
+   * @return String serialization of the input
+   */
+  String serializeToString(T obj, boolean skipNull);
   T deserialize(byte[] bytes);
 }
diff --git a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
index 1075e4c..c62a8b7 100644
--- a/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
+++ b/core/src/main/java/org/apache/druid/common/config/JacksonConfigManager.java
@@ -22,11 +22,13 @@ package org.apache.druid.common.config;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Inject;
-import org.apache.druid.audit.AuditEntry;
 import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.common.config.ConfigManager.SetResult;
+import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.JsonNonNull;
 import org.apache.druid.java.util.common.jackson.JacksonUtils;
 
 import java.io.IOException;
@@ -38,18 +40,21 @@ public class JacksonConfigManager
 {
   private final ConfigManager configManager;
   private final ObjectMapper jsonMapper;
+  private final ObjectMapper jsonMapperSkipNull;
   private final AuditManager auditManager;
 
   @Inject
   public JacksonConfigManager(
       ConfigManager configManager,
-      ObjectMapper jsonMapper,
+      @Json ObjectMapper jsonMapper,
+      @JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue,
       AuditManager auditManager
   )
   {
     this.configManager = configManager;
     this.jsonMapper = jsonMapper;
     this.auditManager = auditManager;
+    this.jsonMapperSkipNull = jsonMapperOnlyNonNullValue;
   }
 
   public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz)
@@ -72,18 +77,12 @@ public class JacksonConfigManager
     ConfigSerde configSerde = create(val.getClass(), null);
     // Audit and actual config change are done in separate transactions
     // there can be phantom audits and reOrdering in audit changes as well.
-    auditManager.doAudit(
-        AuditEntry.builder()
-                  .key(key)
-                  .type(key)
-                  .auditInfo(auditInfo)
-                  .payload(configSerde.serializeToString(val))
-                  .build()
-    );
+    auditManager.doAudit(key, key, auditInfo, val, configSerde);
     return configManager.set(key, configSerde, val);
   }
 
-  private <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
+  @VisibleForTesting
+  <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
   {
     return new ConfigSerde<T>()
     {
@@ -99,10 +98,10 @@ public class JacksonConfigManager
       }
 
       @Override
-      public String serializeToString(T obj)
+      public String serializeToString(T obj, boolean skipNull)
       {
         try {
-          return jsonMapper.writeValueAsString(obj);
+          return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
         }
         catch (JsonProcessingException e) {
           throw new RuntimeException(e);
@@ -121,7 +120,8 @@ public class JacksonConfigManager
     };
   }
 
-  private <T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
+  @VisibleForTesting
+  <T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
   {
     return new ConfigSerde<T>()
     {
@@ -137,10 +137,10 @@ public class JacksonConfigManager
       }
 
       @Override
-      public String serializeToString(T obj)
+      public String serializeToString(T obj, boolean skipNull)
       {
         try {
-          return jsonMapper.writeValueAsString(obj);
+          return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
         }
         catch (JsonProcessingException e) {
           throw new RuntimeException(e);
diff --git a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
index 5e4db78..bb03146 100644
--- a/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
+++ b/core/src/main/java/org/apache/druid/guice/DruidSecondaryModule.java
@@ -30,6 +30,7 @@ import com.google.inject.Key;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.JsonNonNull;
 import org.apache.druid.guice.annotations.Smile;
 import org.skife.config.ConfigurationObjectFactory;
 
@@ -41,6 +42,7 @@ public class DruidSecondaryModule implements Module
   private final Properties properties;
   private final ConfigurationObjectFactory factory;
   private final ObjectMapper jsonMapper;
+  private final ObjectMapper jsonMapperOnlyNonNullValueSerialization;
   private final ObjectMapper smileMapper;
   private final Validator validator;
 
@@ -49,6 +51,7 @@ public class DruidSecondaryModule implements Module
       Properties properties,
       ConfigurationObjectFactory factory,
       @Json ObjectMapper jsonMapper,
+      @JsonNonNull ObjectMapper jsonMapperOnlyNonNullValueSerialization,
       @Smile ObjectMapper smileMapper,
       Validator validator
   )
@@ -56,6 +59,7 @@ public class DruidSecondaryModule implements Module
     this.properties = properties;
     this.factory = factory;
     this.jsonMapper = jsonMapper;
+    this.jsonMapperOnlyNonNullValueSerialization = jsonMapperOnlyNonNullValueSerialization;
     this.smileMapper = smileMapper;
     this.validator = validator;
   }
@@ -78,6 +82,13 @@ public class DruidSecondaryModule implements Module
     return jsonMapper;
   }
 
+  @Provides @LazySingleton @JsonNonNull
+  public ObjectMapper getJsonMapperOnlyNonNullValueSerialization(final Injector injector)
+  {
+    setupJackson(injector, jsonMapperOnlyNonNullValueSerialization);
+    return jsonMapperOnlyNonNullValueSerialization;
+  }
+
   @Provides @LazySingleton @Smile
   public ObjectMapper getSmileMapper(Injector injector)
   {
diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java b/core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java
similarity index 61%
copy from server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java
copy to core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java
index 4ef45d1..ae4672f 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java
+++ b/core/src/main/java/org/apache/druid/guice/annotations/JsonNonNull.java
@@ -17,27 +17,22 @@
  * under the License.
  */
 
-package org.apache.druid.server.audit;
+package org.apache.druid.guice.annotations;
 
-import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.inject.BindingAnnotation;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
 
 /**
+ * The ObjectMapper of this annotation will skip serialization of any field with null value.
  */
-public class SQLAuditManagerConfig
+@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@BindingAnnotation
+@PublicApi
+public @interface JsonNonNull
 {
-  @JsonProperty
-  private long auditHistoryMillis = 7 * 24 * 60 * 60 * 1000L; // 1 WEEK
-
-  @JsonProperty
-  private boolean includePayloadAsDimensionInMetric = false;
-
-  public long getAuditHistoryMillis()
-  {
-    return auditHistoryMillis;
-  }
-
-  public boolean getIncludePayloadAsDimensionInMetric()
-  {
-    return includePayloadAsDimensionInMetric;
-  }
 }
diff --git a/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
new file mode 100644
index 0000000..25220eb
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/common/config/JacksonConfigManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.common.config;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.audit.AuditInfo;
+import org.apache.druid.audit.AuditManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class JacksonConfigManagerTest
+{
+  @Mock
+  private ConfigManager mockConfigManager;
+
+  @Mock
+  private AuditManager mockAuditManager;
+
+  private JacksonConfigManager jacksonConfigManager;
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Before
+  public void setUp()
+  {
+    jacksonConfigManager = new JacksonConfigManager(
+        mockConfigManager,
+        new ObjectMapper(),
+        new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL),
+        mockAuditManager
+    );
+  }
+
+  @Test
+  public void testSerializeToStringWithSkipNullTrue()
+  {
+    ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
+    {
+    }, null);
+    ConfigSerde<TestConfig> configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null);
+    TestConfig config = new TestConfig("version", null, 3);
+    String actual = configConfigSerdeFromTypeReference.serializeToString(config, true);
+    Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual);
+    actual = configConfigSerdeFromClass.serializeToString(config, true);
+    Assert.assertEquals("{\"version\":\"version\",\"settingInt\":3}", actual);
+  }
+
+  @Test
+  public void testSerializeToStringWithSkipNullFalse()
+  {
+    ConfigSerde<TestConfig> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<TestConfig>()
+    {
+    }, null);
+    ConfigSerde<TestConfig> configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null);
+    TestConfig config = new TestConfig("version", null, 3);
+    String actual = configConfigSerdeFromTypeReference.serializeToString(config, false);
+    Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual);
+    actual = configConfigSerdeFromClass.serializeToString(config, false);
+    Assert.assertEquals("{\"version\":\"version\",\"settingString\":null,\"settingInt\":3}", actual);
+  }
+
+  @Test
+  public void testSerializeToStringWithInvalidConfigForConfigSerdeFromTypeReference()
+  {
+    ConfigSerde<ClassThatJacksonCannotSerialize> configConfigSerdeFromTypeReference = jacksonConfigManager.create(new TypeReference<ClassThatJacksonCannotSerialize>()
+    {
+    }, null);
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("InvalidDefinitionException");
+    configConfigSerdeFromTypeReference.serializeToString(new ClassThatJacksonCannotSerialize(), false);
+  }
+
+  @Test
+  public void testSerializeToStringWithInvalidConfigForConfigSerdeFromClass()
+  {
+    ConfigSerde<ClassThatJacksonCannotSerialize> configConfigSerdeFromClass = jacksonConfigManager.create(ClassThatJacksonCannotSerialize.class, null);
+    exception.expect(RuntimeException.class);
+    exception.expectMessage("InvalidDefinitionException");
+    configConfigSerdeFromClass.serializeToString(new ClassThatJacksonCannotSerialize(), false);
+  }
+
+  @Test
+  public void testSet()
+  {
+    String key = "key";
+    TestConfig val = new TestConfig("version", "string", 3);
+    AuditInfo auditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+
+    jacksonConfigManager.set(key, val, auditInfo);
+
+    ArgumentCaptor<ConfigSerde> configSerdeCapture = ArgumentCaptor.forClass(
+        ConfigSerde.class);
+    Mockito.verify(mockAuditManager).doAudit(
+        ArgumentMatchers.eq(key),
+        ArgumentMatchers.eq(key),
+        ArgumentMatchers.eq(auditInfo),
+        ArgumentMatchers.eq(val),
+        configSerdeCapture.capture()
+    );
+    Assert.assertNotNull(configSerdeCapture.getValue());
+  }
+
+
+  static class TestConfig
+  {
+    private final String version;
+    private final String settingString;
+    private final int settingInt;
+
+    @JsonCreator
+    public TestConfig(
+        @JsonProperty("version") String version,
+        @JsonProperty("settingString") String settingString,
+        @JsonProperty("settingInt") int settingInt
+    )
+    {
+      this.version = version;
+      this.settingString = settingString;
+      this.settingInt = settingInt;
+    }
+
+    public String getVersion()
+    {
+      return version;
+    }
+
+    public String getSettingString()
+    {
+      return settingString;
+    }
+
+    public int getSettingInt()
+    {
+      return settingInt;
+    }
+  }
+
+  static class ClassThatJacksonCannotSerialize
+  {
+
+  }
+}
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 62e36bd..cd26a9e 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -338,6 +338,8 @@ Coordinator and Overlord log changes to lookups, segment load/drop rules, dynami
 |--------|-----------|-------|
 |`druid.audit.manager.auditHistoryMillis`|Default duration for querying audit history.|1 week|
 |`druid.audit.manager.includePayloadAsDimensionInMetric`|Boolean flag on whether to add `payload` column in service metric.|false|
+|`druid.audit.manager.maxPayloadSizeBytes`|The maximum size of audit payload to store in Druid's metadata store audit table. If the size of audit payload exceeds this value, the audit log would be stored with a message indicating that the payload was omitted instead. Setting `maxPayloadSizeBytes` to -1 (default value) disables this check, meaning Druid will always store audit payload regardless of it's size. Setting to any negative number other than `-1` is invalid. Human-readable format [...]
+|`druid.audit.manager.skipNullField`|If true, the audit payload stored in metadata store will exclude any field with null value. |false|
 
 ### Enabling Metrics
 
diff --git a/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java b/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java
index 853a088..a7c947d 100644
--- a/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java
+++ b/processing/src/main/java/org/apache/druid/jackson/JacksonModule.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.jackson;
 
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.smile.SmileFactory;
 import com.fasterxml.jackson.dataformat.smile.SmileGenerator;
@@ -28,6 +29,7 @@ import com.google.inject.Module;
 import com.google.inject.Provides;
 import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.annotations.Json;
+import org.apache.druid.guice.annotations.JsonNonNull;
 import org.apache.druid.guice.annotations.Smile;
 
 /**
@@ -46,6 +48,15 @@ public class JacksonModule implements Module
     return new DefaultObjectMapper();
   }
 
+  /**
+   * Provides ObjectMapper that suppress serializing properties with null values
+   */
+  @Provides @LazySingleton @JsonNonNull
+  public ObjectMapper jsonMapperOnlyNonNullValue()
+  {
+    return new DefaultObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
+  }
+
   @Provides @LazySingleton @Smile
   public ObjectMapper smileMapper()
   {
diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
index 9ea53c6..1302043 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
+++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManager.java
@@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
 import com.google.inject.Inject;
 import org.apache.druid.audit.AuditEntry;
+import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigSerde;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.java.util.common.DateTimes;
@@ -76,8 +78,15 @@ public class SQLAuditManager implements AuditManager
   }
 
   @Override
-  public void doAudit(final AuditEntry auditEntry)
+  public <T> void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde<T> configSerde)
   {
+    AuditEntry auditEntry = AuditEntry.builder()
+                                      .key(key)
+                                      .type(type)
+                                      .auditInfo(auditInfo)
+                                      .payload(configSerde.serializeToString(payload, config.isSkipNullField()))
+                                      .build();
+
     dbi.withHandle(
         new HandleCallback<Void>()
         {
@@ -114,6 +123,20 @@ public class SQLAuditManager implements AuditManager
   {
     emitter.emit(getAuditMetricEventBuilder(auditEntry).build("config/audit", 1));
 
+    AuditEntry auditEntryToStore = auditEntry;
+    if (config.getMaxPayloadSizeBytes() >= 0) {
+      int payloadSize = jsonMapper.writeValueAsBytes(auditEntry.getPayload()).length;
+      if (payloadSize > config.getMaxPayloadSizeBytes()) {
+        auditEntryToStore = AuditEntry.builder()
+                                      .key(auditEntry.getKey())
+                                      .type(auditEntry.getType())
+                                      .auditInfo(auditEntry.getAuditInfo())
+                                      .payload(StringUtils.format(PAYLOAD_SKIP_MSG_FORMAT, config.getMaxPayloadSizeBytes()))
+                                      .auditTime(auditEntry.getAuditTime())
+                                      .build();
+      }
+    }
+
     handle.createStatement(
         StringUtils.format(
             "INSERT INTO %s ( audit_key, type, author, comment, created_date, payload) VALUES (:audit_key, :type, :author, :comment, :created_date, :payload)",
@@ -125,7 +148,7 @@ public class SQLAuditManager implements AuditManager
           .bind("author", auditEntry.getAuditInfo().getAuthor())
           .bind("comment", auditEntry.getAuditInfo().getComment())
           .bind("created_date", auditEntry.getAuditTime().toString())
-          .bind("payload", jsonMapper.writeValueAsBytes(auditEntry))
+          .bind("payload", jsonMapper.writeValueAsBytes(auditEntryToStore))
           .execute();
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java
index 4ef45d1..8509e06 100644
--- a/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java
+++ b/server/src/main/java/org/apache/druid/server/audit/SQLAuditManagerConfig.java
@@ -20,6 +20,8 @@
 package org.apache.druid.server.audit;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.java.util.common.HumanReadableBytesRange;
 
 /**
  */
@@ -31,6 +33,16 @@ public class SQLAuditManagerConfig
   @JsonProperty
   private boolean includePayloadAsDimensionInMetric = false;
 
+  @JsonProperty
+  @HumanReadableBytesRange(
+      min = -1,
+      message = "maxPayloadSizeBytes must either be -1 (for disabling the check) or a non negative number"
+  )
+  private HumanReadableBytes maxPayloadSizeBytes = HumanReadableBytes.valueOf(-1);
+
+  @JsonProperty
+  private boolean skipNullField = false;
+
   public long getAuditHistoryMillis()
   {
     return auditHistoryMillis;
@@ -40,4 +52,15 @@ public class SQLAuditManagerConfig
   {
     return includePayloadAsDimensionInMetric;
   }
+
+  public long getMaxPayloadSizeBytes()
+  {
+    return maxPayloadSizeBytes.getBytes();
+  }
+
+  public boolean isSkipNullField()
+  {
+    return skipNullField;
+  }
+
 }
diff --git a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
index 13dc10a..429402e 100644
--- a/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
+++ b/server/src/test/java/org/apache/druid/server/audit/SQLAuditManagerTest.java
@@ -19,14 +19,17 @@
 
 package org.apache.druid.server.audit;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.audit.AuditEntry;
 import org.apache.druid.audit.AuditInfo;
 import org.apache.druid.audit.AuditManager;
+import org.apache.druid.common.config.ConfigSerde;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.jackson.JacksonUtils;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -35,12 +38,19 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
 import org.skife.jdbi.v2.Handle;
 import org.skife.jdbi.v2.tweak.HandleCallback;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+@RunWith(MockitoJUnitRunner.class)
 public class SQLAuditManagerTest
 {
   @Rule
@@ -49,6 +59,8 @@ public class SQLAuditManagerTest
   private TestDerbyConnector connector;
   private AuditManager auditManager;
   private final String PAYLOAD_DIMENSION_KEY = "payload";
+  private ConfigSerde<String> stringConfigSerde;
+
 
   private final ObjectMapper mapper = new DefaultObjectMapper();
 
@@ -64,6 +76,33 @@ public class SQLAuditManagerTest
         mapper,
         new SQLAuditManagerConfig()
     );
+    stringConfigSerde = new ConfigSerde<String>()
+    {
+      @Override
+      public byte[] serialize(String obj)
+      {
+        try {
+          return mapper.writeValueAsBytes(obj);
+        }
+        catch (JsonProcessingException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public String serializeToString(String obj, boolean skipNull)
+      {
+        // In our test, payload Object is already a String
+        // So to serialize to String, we just return a String
+        return obj;
+      }
+
+      @Override
+      public String deserialize(byte[] bytes)
+      {
+        return JacksonUtils.readValue(mapper, bytes, String.class);
+      }
+    };
   }
 
   @Test(timeout = 60_000L)
@@ -125,18 +164,17 @@ public class SQLAuditManagerTest
   @Test(timeout = 60_000L)
   public void testCreateAuditEntry() throws IOException
   {
-    AuditEntry entry = new AuditEntry(
-        "testKey",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-01T00:00:00Z")
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    auditManager.doAudit(entry);
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+
     byte[] payload = connector.lookup(
         derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
         "audit_key",
@@ -144,118 +182,128 @@ public class SQLAuditManagerTest
         "testKey"
     );
     AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
-    Assert.assertEquals(entry, dbEntry);
-
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
   }
 
   @Test(timeout = 60_000L)
   public void testFetchAuditHistory()
   {
-    AuditEntry entry = new AuditEntry(
-        "testKey",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-01T00:00:00Z")
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    auditManager.doAudit(entry);
-    auditManager.doAudit(entry);
+    String entry1Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+
     List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
         "testKey",
         "testType",
-        Intervals.of("2012-01-01T00:00:00Z/2013-01-03T00:00:00Z")
+        Intervals.of("2000-01-01T00:00:00Z/2100-01-03T00:00:00Z")
     );
     Assert.assertEquals(2, auditEntries.size());
-    Assert.assertEquals(entry, auditEntries.get(0));
-    Assert.assertEquals(entry, auditEntries.get(1));
+
+    Assert.assertEquals(entry1Key, auditEntries.get(0).getKey());
+    Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload());
+    Assert.assertEquals(entry1Type, auditEntries.get(0).getType());
+    Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo());
+
+    Assert.assertEquals(entry1Key, auditEntries.get(1).getKey());
+    Assert.assertEquals(entry1Payload, auditEntries.get(1).getPayload());
+    Assert.assertEquals(entry1Type, auditEntries.get(1).getType());
+    Assert.assertEquals(entry1AuditInfo, auditEntries.get(1).getAuditInfo());
   }
 
   @Test(timeout = 60_000L)
   public void testFetchAuditHistoryByKeyAndTypeWithLimit()
   {
-    AuditEntry entry1 = new AuditEntry(
-        "testKey1",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-01T00:00:00Z")
+    String entry1Key = "testKey1";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    AuditEntry entry2 = new AuditEntry(
-        "testKey2",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-02T00:00:00Z")
+    String entry1Payload = "testPayload";
+
+    String entry2Key = "testKey2";
+    String entry2Type = "testType";
+    AuditInfo entry2AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    auditManager.doAudit(entry1);
-    auditManager.doAudit(entry2);
+    String entry2Payload = "testPayload";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+    auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
     List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
         "testKey1",
         "testType",
         1
     );
     Assert.assertEquals(1, auditEntries.size());
-    Assert.assertEquals(entry1, auditEntries.get(0));
+    Assert.assertEquals(entry1Key, auditEntries.get(0).getKey());
+    Assert.assertEquals(entry1Payload, auditEntries.get(0).getPayload());
+    Assert.assertEquals(entry1Type, auditEntries.get(0).getType());
+    Assert.assertEquals(entry1AuditInfo, auditEntries.get(0).getAuditInfo());
   }
 
   @Test(timeout = 60_000L)
   public void testFetchAuditHistoryByTypeWithLimit()
   {
-    AuditEntry entry1 = new AuditEntry(
-        "testKey",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-01T00:00:00Z")
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    AuditEntry entry2 = new AuditEntry(
-        "testKey",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-02T00:00:00Z")
+    String entry1Payload = "testPayload1";
+
+    String entry2Key = "testKey";
+    String entry2Type = "testType";
+    AuditInfo entry2AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    AuditEntry entry3 = new AuditEntry(
-        "testKey",
-        "testType",
-        new AuditInfo(
-            "testAuthor",
-            "testComment",
-            "127.0.0.1"
-        ),
-        "testPayload",
-        DateTimes.of("2013-01-03T00:00:00Z")
+    String entry2Payload = "testPayload2";
+
+    String entry3Key = "testKey";
+    String entry3Type = "testType";
+    AuditInfo entry3AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
     );
-    auditManager.doAudit(entry1);
-    auditManager.doAudit(entry2);
-    auditManager.doAudit(entry3);
+    String entry3Payload = "testPayload3";
+
+    auditManager.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload, stringConfigSerde);
+    auditManager.doAudit(entry2Key, entry2Type, entry2AuditInfo, entry2Payload, stringConfigSerde);
+    auditManager.doAudit(entry3Key, entry3Type, entry3AuditInfo, entry3Payload, stringConfigSerde);
+
     List<AuditEntry> auditEntries = auditManager.fetchAuditHistory(
         "testType",
         2
     );
     Assert.assertEquals(2, auditEntries.size());
-    Assert.assertEquals(entry3, auditEntries.get(0));
-    Assert.assertEquals(entry2, auditEntries.get(1));
+    Assert.assertEquals(entry3Key, auditEntries.get(0).getKey());
+    Assert.assertEquals(entry3Payload, auditEntries.get(0).getPayload());
+    Assert.assertEquals(entry3Type, auditEntries.get(0).getType());
+    Assert.assertEquals(entry3AuditInfo, auditEntries.get(0).getAuditInfo());
+
+    Assert.assertEquals(entry2Key, auditEntries.get(1).getKey());
+    Assert.assertEquals(entry2Payload, auditEntries.get(1).getPayload());
+    Assert.assertEquals(entry2Type, auditEntries.get(1).getType());
+    Assert.assertEquals(entry2AuditInfo, auditEntries.get(1).getAuditInfo());
   }
 
   @Test(expected = IllegalArgumentException.class, timeout = 10_000L)
@@ -270,6 +318,132 @@ public class SQLAuditManagerTest
     auditManager.fetchAuditHistory("testType", 0);
   }
 
+  @Test(timeout = 60_000L)
+  public void testCreateAuditEntryWithPayloadOverSkipPayloadLimit() throws IOException
+  {
+    int maxPayloadSize = 10;
+    SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager(
+        connector,
+        derbyConnectorRule.metadataTablesConfigSupplier(),
+        new NoopServiceEmitter(),
+        mapper,
+        new SQLAuditManagerConfig()
+        {
+          @Override
+          public long getMaxPayloadSizeBytes()
+          {
+            return maxPayloadSize;
+          }
+        }
+    );
+
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "payload audit to store";
+
+    auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
+                                                stringConfigSerde
+    );
+
+    byte[] payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertNotEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(StringUtils.format(AuditManager.PAYLOAD_SKIP_MSG_FORMAT, maxPayloadSize), dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCreateAuditEntryWithPayloadUnderSkipPayloadLimit() throws IOException
+  {
+    SQLAuditManager auditManagerWithMaxPayloadSizeBytes = new SQLAuditManager(
+        connector,
+        derbyConnectorRule.metadataTablesConfigSupplier(),
+        new NoopServiceEmitter(),
+        mapper,
+        new SQLAuditManagerConfig()
+        {
+          @Override
+          public long getMaxPayloadSizeBytes()
+          {
+            return 500;
+          }
+        }
+    );
+    String entry1Key = "testKey";
+    String entry1Type = "testType";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    String entry1Payload = "payload audit to store";
+
+    auditManagerWithMaxPayloadSizeBytes.doAudit(entry1Key, entry1Type, entry1AuditInfo, entry1Payload,
+                                                stringConfigSerde
+    );
+
+    byte[] payload = connector.lookup(
+        derbyConnectorRule.metadataTablesConfigSupplier().get().getAuditTable(),
+        "audit_key",
+        "payload",
+        "testKey"
+    );
+
+    AuditEntry dbEntry = mapper.readValue(payload, AuditEntry.class);
+    Assert.assertEquals(entry1Key, dbEntry.getKey());
+    Assert.assertEquals(entry1Payload, dbEntry.getPayload());
+    Assert.assertEquals(entry1Type, dbEntry.getType());
+    Assert.assertEquals(entry1AuditInfo, dbEntry.getAuditInfo());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testCreateAuditEntryWithSkipNullConfigTrue()
+  {
+    ConfigSerde<Map<String, String>> mockConfigSerde = Mockito.mock(ConfigSerde.class);
+    SQLAuditManager auditManagerWithSkipNull = new SQLAuditManager(
+        connector,
+        derbyConnectorRule.metadataTablesConfigSupplier(),
+        new NoopServiceEmitter(),
+        mapper,
+        new SQLAuditManagerConfig()
+        {
+          @Override
+          public boolean isSkipNullField()
+          {
+            return true;
+          }
+        }
+    );
+
+    String entry1Key = "test1Key";
+    String entry1Type = "test1Type";
+    AuditInfo entry1AuditInfo = new AuditInfo(
+        "testAuthor",
+        "testComment",
+        "127.0.0.1"
+    );
+    // Entry 1 payload has a null field for one of the property
+    Map<String, String> entryPayload1WithNull = new HashMap<>();
+    entryPayload1WithNull.put("version", "x");
+    entryPayload1WithNull.put("something", null);
+
+    auditManagerWithSkipNull.doAudit(entry1Key, entry1Type, entry1AuditInfo, entryPayload1WithNull, mockConfigSerde);
+    Mockito.verify(mockConfigSerde).serializeToString(ArgumentMatchers.eq(entryPayload1WithNull), ArgumentMatchers.eq(true));
+  }
+
   @After
   public void cleanup()
   {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org