You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2021/12/03 13:34:13 UTC

[druid] branch master updated: Improve exception message when loading data from web-console (#11723)

This is an automated email from the ASF dual-hosted git repository.

frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c2cea25  Improve exception message when loading data from web-console (#11723)
c2cea25 is described below

commit c2cea25a6b4f1d1a85692d83e92d277fba79f673
Author: Frank Chen <fr...@outlook.com>
AuthorDate: Fri Dec 3 21:33:49 2021 +0800

    Improve exception message when loading data from web-console (#11723)
    
    * Improve exception handling
    
    * Revert some changes
    
    * Resolve comments
    
    * Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
    
    Co-authored-by: Karan Kumar <ka...@gmail.com>
    
    * Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
    
    Co-authored-by: Karan Kumar <ka...@gmail.com>
    
    * Address review comments
    
    Co-authored-by: Karan Kumar <ka...@gmail.com>
---
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java | 58 ++++++++++++++++++++++
 .../overlord/sampler/SamplerException.java         |  3 ++
 .../overlord/sampler/SamplerExceptionMapper.java   | 10 +++-
 .../seekablestream/SeekableStreamSamplerSpec.java  | 13 ++++-
 4 files changed, 82 insertions(+), 2 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dd712d9..4420d2f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.indexing.kafka.test.TestBroker;
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
 import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Arrays;
 import java.util.Iterator;
@@ -59,6 +62,9 @@ import java.util.List;
 
 public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
 {
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
   private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
   private static final String TOPIC = "sampling";
   private static final DataSchema DATA_SCHEMA = new DataSchema(
@@ -288,4 +294,56 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
       throw new RuntimeException(e);
     }
   }
+
+  @Test
+  public void testInvalidKafkaConfig()
+  {
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+        null,
+        DATA_SCHEMA,
+        null,
+        new KafkaSupervisorIOConfig(
+            TOPIC,
+            new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+            null,
+            null,
+            null,
+
+            // invalid bootstrap server
+            ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
+
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, null),
+        new InputSourceSampler(),
+        OBJECT_MAPPER
+    );
+
+    expectedException.expect(SamplerException.class);
+    expectedException.expectMessage("Invalid url in bootstrap.servers");
+    samplerSpec.sample();
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
index b7937b8..e1f903c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
@@ -21,6 +21,9 @@ package org.apache.druid.indexing.overlord.sampler;
 
 import org.apache.druid.java.util.common.StringUtils;
 
+/**
+ * This exception will be mapped to a JSON object that will be returned to the client by {@link SamplerExceptionMapper}
+ */
 public class SamplerException extends RuntimeException
 {
   public SamplerException(Throwable cause, String formatText, Object... arguments)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
index d35e5ab..bade3e5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord.sampler;
 
 import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.logger.Logger;
 
 import javax.ws.rs.core.Response;
 import javax.ws.rs.ext.ExceptionMapper;
@@ -28,13 +29,20 @@ import javax.ws.rs.ext.Provider;
 @Provider
 public class SamplerExceptionMapper implements ExceptionMapper<SamplerException>
 {
+  private static final Logger LOG = new Logger(SamplerExceptionMapper.class);
+
   @Override
   public Response toResponse(SamplerException exception)
   {
+    String message = exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage();
+
+    // Logging the stack trace and returning the exception message in the response
+    LOG.error(exception, message);
+
     return Response.status(Response.Status.BAD_REQUEST)
                    .entity(ImmutableMap.of(
                        "error",
-                       exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage()
+                       message
                    ))
                    .build();
   }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 106098b..3be7a4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.seekablestream;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import org.apache.druid.client.indexing.SamplerResponse;
 import org.apache.druid.client.indexing.SamplerSpec;
 import org.apache.druid.data.input.ByteBufferInputRowParser;
@@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
 import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
 import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
 import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
@@ -92,9 +94,18 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
       );
       inputFormat = null;
     } else {
+      RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
+
+      try {
+        recordSupplier = createRecordSupplier();
+      }
+      catch (Exception e) {
+        throw new SamplerException(e, "Unable to create RecordSupplier: %s", Throwables.getRootCause(e).getMessage());
+      }
+
       inputSource = new RecordSupplierInputSource<>(
           ioConfig.getStream(),
-          createRecordSupplier(),
+          recordSupplier,
           ioConfig.isUseEarliestSequenceNumber()
       );
       inputFormat = Preconditions.checkNotNull(

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org