You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/03/09 00:23:31 UTC
[druid] branch master updated: fix KafkaInputFormat when used with Sampler API (#13900)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c7f4bb5056 fix KafkaInputFormat when used with Sampler API (#13900)
c7f4bb5056 is described below
commit c7f4bb50563ec2ca64154c26bf3e6c427a2e1b91
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Mar 8 16:23:24 2023 -0800
fix KafkaInputFormat when used with Sampler API (#13900)
* fix KafkaInputFormat when used with Sampler API
* handle key format sampling the same as value format sampling
---
.../data/input/kafkainput/KafkaInputFormat.java | 8 +-
.../data/input/kafkainput/KafkaInputReader.java | 161 ++++++++++++++++-----
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 111 +++++++++++++-
3 files changed, 243 insertions(+), 37 deletions(-)
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index aad06bac7b..8f86ff07de 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -81,7 +81,13 @@ public class KafkaInputFormat implements InputFormat
@Override
public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
{
- SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
+ final SettableByteEntity<KafkaRecordEntity> settableByteEntitySource;
+ if (source instanceof SettableByteEntity) {
+ settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
+ } else {
+ settableByteEntitySource = new SettableByteEntity<>();
+ settableByteEntitySource.setEntity((KafkaRecordEntity) source);
+ }
InputRowSchema newInputRowSchema = new InputRowSchema(
dummyTimestampSpec,
inputRowSchema.getDimensionsSpec(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 56e9421594..6e6ac8fa10 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -89,6 +89,43 @@ public class KafkaInputReader implements InputEntityReader
public CloseableIterator<InputRow> read() throws IOException
{
final KafkaRecordEntity record = source.getEntity();
+ final Map<String, Object> mergedHeaderMap = extractHeaderAndKeys(record);
+
+ // Ignore tombstone records that have null values.
+ if (record.getRecord().value() != null) {
+ return buildBlendedRows(valueParser, mergedHeaderMap);
+ } else {
+ return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+ }
+ }
+
+ @Override
+ public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
+ {
+ final KafkaRecordEntity record = source.getEntity();
+ InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record);
+ if (record.getRecord().value() != null) {
+ return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+ } else {
+ final List<InputRowListPlusRawValues> rows = Collections.singletonList(keysAndHeader);
+ return CloseableIterators.withEmptyBaggage(rows.iterator());
+ }
+ }
+
+ private List<String> getFinalDimensionList(Set<String> newDimensions)
+ {
+ final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
+ if (!schemaDimensions.isEmpty()) {
+ return schemaDimensions;
+ } else {
+ return Lists.newArrayList(
+ Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
+ );
+ }
+ }
+
+ private Map<String, Object> extractHeader(KafkaRecordEntity record)
+ {
final Map<String, Object> mergedHeaderMap = new HashMap<>();
if (headerParserSupplier != null) {
KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
@@ -102,7 +139,13 @@ public class KafkaInputReader implements InputEntityReader
// the header list
mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
- InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
+ return mergedHeaderMap;
+ }
+
+ private Map<String, Object> extractHeaderAndKeys(KafkaRecordEntity record) throws IOException
+ {
+ final Map<String, Object> mergedHeaderMap = extractHeader(record);
+ final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
if (keyParser != null) {
try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
// Key currently only takes the first row and ignores the rest.
@@ -123,31 +166,7 @@ public class KafkaInputReader implements InputEntityReader
);
}
}
-
- // Ignore tombstone records that have null values.
- if (record.getRecord().value() != null) {
- return buildBlendedRows(valueParser, mergedHeaderMap);
- } else {
- return buildRowsWithoutValuePayload(mergedHeaderMap);
- }
- }
-
- @Override
- public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
- {
- return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
- }
-
- private List<String> getFinalDimensionList(Set<String> newDimensions)
- {
- final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
- if (!schemaDimensions.isEmpty()) {
- return schemaDimensions;
- } else {
- return Lists.newArrayList(
- Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
- );
- }
+ return mergedHeaderMap;
}
private CloseableIterator<InputRow> buildBlendedRows(
@@ -185,15 +204,91 @@ public class KafkaInputReader implements InputEntityReader
);
}
- private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
+ private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException
+ {
+ Map<String, Object> mergedHeaderMap = extractHeader(record);
+ InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
+ if (keyParser != null) {
+ try (CloseableIterator<InputRowListPlusRawValues> keyIterator = keyParser.sample()) {
+ // Key currently only takes the first row and ignores the rest.
+ if (keyIterator.hasNext()) {
+ // Return type for the key parser should be of type MapBasedInputRow
+ // Parsers returning other types are not compatible currently.
+ InputRowListPlusRawValues keyRow = keyIterator.next();
+ // Add the key to the mergeList only if the key string is not already present
+ mergedHeaderMap.putIfAbsent(
+ keyColumnName,
+ keyRow.getRawValues().entrySet().stream().findFirst().get().getValue()
+ );
+ return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
+ }
+ }
+ catch (ClassCastException e) {
+ throw new IOException(
+ "Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
+ );
+ }
+ }
+ return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
+ }
+
+ private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
+ InputEntityReader valueParser,
+ Map<String, Object> headerKeyList
+ ) throws IOException
+ {
+ return valueParser.sample().map(
+ rowAndValues -> {
+ if (rowAndValues.getParseException() != null) {
+ return rowAndValues;
+ }
+ List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
+ List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
+ ParseException parseException = null;
+
+ for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
+ newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
+ }
+ for (InputRow r : rowAndValues.getInputRows()) {
+ MapBasedInputRow valueRow = null;
+ try {
+ valueRow = (MapBasedInputRow) r;
+ }
+ catch (ClassCastException e) {
+ parseException = new ParseException(
+ null,
+ "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
+ );
+ }
+ if (valueRow != null) {
+ final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
+ final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
+ newDimensions.addAll(headerKeyList.keySet());
+ // Remove the dummy timestamp added in KafkaInputFormat
+ newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
+ newInputRows.add(
+ new MapBasedInputRow(
+ inputRowSchema.getTimestampSpec().extractTimestamp(event),
+ getFinalDimensionList(newDimensions),
+ event
+ )
+ );
+ }
+ }
+ return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
+ }
+ );
+ }
+
+ private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
{
- final InputRow row = new MapBasedInputRow(
- inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
- getFinalDimensionList(headerKeyList.keySet()),
- headerKeyList
+ return Collections.singletonList(
+ new MapBasedInputRow(
+ inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
+ getFinalDimensionList(headerKeyList.keySet()),
+ headerKeyList
+ )
);
- final List<InputRow> rows = Collections.singletonList(row);
- return CloseableIterators.withEmptyBaggage(rows.iterator());
}
/**
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 47327dbeee..242e1fa4a1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.test.TestBroker;
@@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null
);
+ private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
+ "test_ds",
+ new TimestampSpec("kafka.timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ )
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+ null
+ );
+
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
@@ -126,7 +147,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
zkServer.stop();
}
- @Test(timeout = 30_000L)
+ @Test
public void testSample()
{
insertData(generateRecords(TOPIC));
@@ -169,7 +190,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
- new SamplerConfig(5, null, null, null),
+ new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(OBJECT_MAPPER),
OBJECT_MAPPER
);
@@ -177,6 +198,90 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
runSamplerAndCompareResponse(samplerSpec, true);
}
+ @Test
+ public void testSampleKafkaInputFormat()
+ {
+ insertData(generateRecords(TOPIC));
+
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+ null,
+ DATA_SCHEMA_KAFKA_TIMESTAMP,
+ null,
+ new KafkaSupervisorIOConfig(
+ TOPIC,
+ new KafkaInputFormat(
+ null,
+ null,
+ new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+ null,
+ null,
+ null
+ ),
+
+ null,
+ null,
+ null,
+ kafkaServer.consumerProperties(),
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+ supervisorSpec,
+ new SamplerConfig(5, 5_000, null, null),
+ new InputSourceSampler(OBJECT_MAPPER),
+ OBJECT_MAPPER
+ );
+
+ SamplerResponse response = samplerSpec.sample();
+
+ Assert.assertEquals(5, response.getNumRowsRead());
+ // we can parse an extra row compared to other generated data samples because we are using kafka timestamp
+ // for timestamp
+ Assert.assertEquals(4, response.getNumRowsIndexed());
+ Assert.assertEquals(5, response.getData().size());
+
+ Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();
+
+ SamplerResponse.SamplerResponseRow nextRow;
+ Map<String, Object> rawInput;
+ Map<String, Object> parsedInput;
+
+ for (int i = 0; i < 4; i++) {
+ nextRow = it.next();
+ Assert.assertNull(nextRow.isUnparseable());
+ rawInput = nextRow.getInput();
+ parsedInput = nextRow.getParsed();
+ Assert.assertTrue(rawInput.containsKey("kafka.timestamp"));
+ Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time"));
+ }
+ nextRow = it.next();
+ Assert.assertTrue(nextRow.isUnparseable());
+
+ Assert.assertFalse(it.hasNext());
+ }
+
@Test
public void testWithInputRowParser() throws IOException
{
@@ -245,7 +350,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
- new SamplerConfig(5, null, null, null),
+ new SamplerConfig(5, 5_000, null, null),
new InputSourceSampler(new DefaultObjectMapper()),
OBJECT_MAPPER
);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org