You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by fr...@apache.org on 2021/12/03 13:34:13 UTC
[druid] branch master updated: Improve exception message when loading data from web-console (#11723)
This is an automated email from the ASF dual-hosted git repository.
frankchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c2cea25 Improve exception message when loading data from web-console (#11723)
c2cea25 is described below
commit c2cea25a6b4f1d1a85692d83e92d277fba79f673
Author: Frank Chen <fr...@outlook.com>
AuthorDate: Fri Dec 3 21:33:49 2021 +0800
Improve exception message when loading data from web-console (#11723)
* Improve exception handling
* Revert some changes
* Resolve comments
* Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
Co-authored-by: Karan Kumar <ka...@gmail.com>
* Update indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
Co-authored-by: Karan Kumar <ka...@gmail.com>
* Address review comments
Co-authored-by: Karan Kumar <ka...@gmail.com>
---
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 58 ++++++++++++++++++++++
.../overlord/sampler/SamplerException.java | 3 ++
.../overlord/sampler/SamplerExceptionMapper.java | 10 +++-
.../seekablestream/SeekableStreamSamplerSpec.java | 13 ++++-
4 files changed, 82 insertions(+), 2 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index dd712d9..4420d2f 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -35,6 +35,7 @@ import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@@ -51,7 +52,9 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Arrays;
import java.util.Iterator;
@@ -59,6 +62,9 @@ import java.util.List;
public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
{
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private static final String TOPIC = "sampling";
private static final DataSchema DATA_SCHEMA = new DataSchema(
@@ -288,4 +294,56 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
throw new RuntimeException(e);
}
}
+
+ @Test
+ public void testInvalidKafkaConfig()
+ {
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+ null,
+ DATA_SCHEMA,
+ null,
+ new KafkaSupervisorIOConfig(
+ TOPIC,
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
+ null,
+ null,
+ null,
+
+ // invalid bootstrap server
+ ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
+
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+ supervisorSpec,
+ new SamplerConfig(5, null),
+ new InputSourceSampler(),
+ OBJECT_MAPPER
+ );
+
+ expectedException.expect(SamplerException.class);
+ expectedException.expectMessage("Invalid url in bootstrap.servers");
+ samplerSpec.sample();
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
index b7937b8..e1f903c 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerException.java
@@ -21,6 +21,9 @@ package org.apache.druid.indexing.overlord.sampler;
import org.apache.druid.java.util.common.StringUtils;
+/**
+ * This exception will be mapped to a JSON object that will be returned to the client by {@link SamplerExceptionMapper}
+ */
public class SamplerException extends RuntimeException
{
public SamplerException(Throwable cause, String formatText, Object... arguments)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
index d35e5ab..bade3e5 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerExceptionMapper.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.sampler;
import com.google.common.collect.ImmutableMap;
+import org.apache.druid.java.util.common.logger.Logger;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
@@ -28,13 +29,20 @@ import javax.ws.rs.ext.Provider;
@Provider
public class SamplerExceptionMapper implements ExceptionMapper<SamplerException>
{
+ private static final Logger LOG = new Logger(SamplerExceptionMapper.class);
+
@Override
public Response toResponse(SamplerException exception)
{
+ String message = exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage();
+
+ // Logging the stack trace and returning the exception message in the response
+ LOG.error(exception, message);
+
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of(
"error",
- exception.getMessage() == null ? "The sampler encountered an issue" : exception.getMessage()
+ message
))
.build();
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
index 106098b..3be7a4f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.seekablestream;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerSpec;
import org.apache.druid.data.input.ByteBufferInputRowParser;
@@ -38,6 +39,7 @@ import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
@@ -92,9 +94,18 @@ public abstract class SeekableStreamSamplerSpec<PartitionIdType, SequenceOffsetT
);
inputFormat = null;
} else {
+ RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType> recordSupplier;
+
+ try {
+ recordSupplier = createRecordSupplier();
+ }
+ catch (Exception e) {
+ throw new SamplerException(e, "Unable to create RecordSupplier: %s", Throwables.getRootCause(e).getMessage());
+ }
+
inputSource = new RecordSupplierInputSource<>(
ioConfig.getStream(),
- createRecordSupplier(),
+ recordSupplier,
ioConfig.isUseEarliestSequenceNumber()
);
inputFormat = Preconditions.checkNotNull(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org