You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2023/03/09 00:23:31 UTC

[druid] branch master updated: fix KafkaInputFormat when used with Sampler API (#13900)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c7f4bb5056 fix KafkaInputFormat when used with Sampler API (#13900)
c7f4bb5056 is described below

commit c7f4bb50563ec2ca64154c26bf3e6c427a2e1b91
Author: Clint Wylie <cw...@apache.org>
AuthorDate: Wed Mar 8 16:23:24 2023 -0800

    fix KafkaInputFormat when used with Sampler API (#13900)
    
    * fix KafkaInputFormat when used with Sampler API
    
    * handle key format sampling the same as value format sampling
---
 .../data/input/kafkainput/KafkaInputFormat.java    |   8 +-
 .../data/input/kafkainput/KafkaInputReader.java    | 161 ++++++++++++++++-----
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java | 111 +++++++++++++-
 3 files changed, 243 insertions(+), 37 deletions(-)

diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
index aad06bac7b..8f86ff07de 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java
@@ -81,7 +81,13 @@ public class KafkaInputFormat implements InputFormat
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, InputEntity source, File temporaryDirectory)
   {
-    SettableByteEntity<KafkaRecordEntity> settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
+    final SettableByteEntity<KafkaRecordEntity> settableByteEntitySource;
+    if (source instanceof SettableByteEntity) {
+      settableByteEntitySource = (SettableByteEntity<KafkaRecordEntity>) source;
+    } else {
+      settableByteEntitySource = new SettableByteEntity<>();
+      settableByteEntitySource.setEntity((KafkaRecordEntity) source);
+    }
     InputRowSchema newInputRowSchema = new InputRowSchema(
         dummyTimestampSpec,
         inputRowSchema.getDimensionsSpec(),
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
index 56e9421594..6e6ac8fa10 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java
@@ -89,6 +89,43 @@ public class KafkaInputReader implements InputEntityReader
   public CloseableIterator<InputRow> read() throws IOException
   {
     final KafkaRecordEntity record = source.getEntity();
+    final Map<String, Object> mergedHeaderMap = extractHeaderAndKeys(record);
+
+    // Ignore tombstone records that have null values.
+    if (record.getRecord().value() != null) {
+      return buildBlendedRows(valueParser, mergedHeaderMap);
+    } else {
+      return CloseableIterators.withEmptyBaggage(buildInputRowsForMap(mergedHeaderMap).iterator());
+    }
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
+  {
+    final KafkaRecordEntity record = source.getEntity();
+    InputRowListPlusRawValues keysAndHeader = extractHeaderAndKeysSample(record);
+    if (record.getRecord().value() != null) {
+      return buildBlendedRowsSample(valueParser, keysAndHeader.getRawValues());
+    } else {
+      final List<InputRowListPlusRawValues> rows = Collections.singletonList(keysAndHeader);
+      return CloseableIterators.withEmptyBaggage(rows.iterator());
+    }
+  }
+
+  private List<String> getFinalDimensionList(Set<String> newDimensions)
+  {
+    final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
+    if (!schemaDimensions.isEmpty()) {
+      return schemaDimensions;
+    } else {
+      return Lists.newArrayList(
+          Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
+      );
+    }
+  }
+
+  private Map<String, Object> extractHeader(KafkaRecordEntity record)
+  {
     final Map<String, Object> mergedHeaderMap = new HashMap<>();
     if (headerParserSupplier != null) {
       KafkaHeaderReader headerParser = headerParserSupplier.apply(record);
@@ -102,7 +139,13 @@ public class KafkaInputReader implements InputEntityReader
     // the header list
     mergedHeaderMap.putIfAbsent(timestampColumnName, record.getRecord().timestamp());
 
-    InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
+    return mergedHeaderMap;
+  }
+
+  private Map<String, Object> extractHeaderAndKeys(KafkaRecordEntity record) throws IOException
+  {
+    final Map<String, Object> mergedHeaderMap = extractHeader(record);
+    final InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
     if (keyParser != null) {
       try (CloseableIterator<InputRow> keyIterator = keyParser.read()) {
         // Key currently only takes the first row and ignores the rest.
@@ -123,31 +166,7 @@ public class KafkaInputReader implements InputEntityReader
         );
       }
     }
-
-    // Ignore tombstone records that have null values.
-    if (record.getRecord().value() != null) {
-      return buildBlendedRows(valueParser, mergedHeaderMap);
-    } else {
-      return buildRowsWithoutValuePayload(mergedHeaderMap);
-    }
-  }
-
-  @Override
-  public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
-  {
-    return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
-  }
-
-  private List<String> getFinalDimensionList(Set<String> newDimensions)
-  {
-    final List<String> schemaDimensions = inputRowSchema.getDimensionsSpec().getDimensionNames();
-    if (!schemaDimensions.isEmpty()) {
-      return schemaDimensions;
-    } else {
-      return Lists.newArrayList(
-          Sets.difference(newDimensions, inputRowSchema.getDimensionsSpec().getDimensionExclusions())
-      );
-    }
+    return mergedHeaderMap;
   }
 
   private CloseableIterator<InputRow> buildBlendedRows(
@@ -185,15 +204,91 @@ public class KafkaInputReader implements InputEntityReader
     );
   }
 
-  private CloseableIterator<InputRow> buildRowsWithoutValuePayload(Map<String, Object> headerKeyList)
+  private InputRowListPlusRawValues extractHeaderAndKeysSample(KafkaRecordEntity record) throws IOException
+  {
+    Map<String, Object> mergedHeaderMap = extractHeader(record);
+    InputEntityReader keyParser = (keyParserSupplier == null) ? null : keyParserSupplier.apply(record);
+    if (keyParser != null) {
+      try (CloseableIterator<InputRowListPlusRawValues> keyIterator = keyParser.sample()) {
+        // Key currently only takes the first row and ignores the rest.
+        if (keyIterator.hasNext()) {
+          // Return type for the key parser should be of type MapBasedInputRow
+          // Parsers returning other types are not compatible currently.
+          InputRowListPlusRawValues keyRow = keyIterator.next();
+          // Add the key to the mergeList only if the key string is not already present
+          mergedHeaderMap.putIfAbsent(
+              keyColumnName,
+              keyRow.getRawValues().entrySet().stream().findFirst().get().getValue()
+          );
+          return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
+        }
+      }
+      catch (ClassCastException e) {
+        throw new IOException(
+            "Unsupported keyFormat. KafkaInputformat only supports input format that return MapBasedInputRow rows"
+        );
+      }
+    }
+    return InputRowListPlusRawValues.of(buildInputRowsForMap(mergedHeaderMap), mergedHeaderMap);
+  }
+
+  private CloseableIterator<InputRowListPlusRawValues> buildBlendedRowsSample(
+      InputEntityReader valueParser,
+      Map<String, Object> headerKeyList
+  ) throws IOException
+  {
+    return valueParser.sample().map(
+        rowAndValues -> {
+          if (rowAndValues.getParseException() != null) {
+            return rowAndValues;
+          }
+          List<InputRow> newInputRows = Lists.newArrayListWithCapacity(rowAndValues.getInputRows().size());
+          List<Map<String, Object>> newRawRows = Lists.newArrayListWithCapacity(rowAndValues.getRawValues().size());
+          ParseException parseException = null;
+
+          for (Map<String, Object> raw : rowAndValues.getRawValuesList()) {
+            newRawRows.add(buildBlendedEventMap(raw, headerKeyList));
+          }
+          for (InputRow r : rowAndValues.getInputRows()) {
+            MapBasedInputRow valueRow = null;
+            try {
+              valueRow = (MapBasedInputRow) r;
+            }
+            catch (ClassCastException e) {
+              parseException = new ParseException(
+                  null,
+                  "Unsupported input format in valueFormat. KafkaInputFormat only supports input format that return MapBasedInputRow rows"
+              );
+            }
+            if (valueRow != null) {
+              final Map<String, Object> event = buildBlendedEventMap(valueRow.getEvent(), headerKeyList);
+              final HashSet<String> newDimensions = new HashSet<>(valueRow.getDimensions());
+              newDimensions.addAll(headerKeyList.keySet());
+              // Remove the dummy timestamp added in KafkaInputFormat
+              newDimensions.remove(KafkaInputFormat.DEFAULT_AUTO_TIMESTAMP_STRING);
+              newInputRows.add(
+                  new MapBasedInputRow(
+                      inputRowSchema.getTimestampSpec().extractTimestamp(event),
+                      getFinalDimensionList(newDimensions),
+                      event
+                  )
+              );
+            }
+          }
+          return InputRowListPlusRawValues.ofList(newRawRows, newInputRows, parseException);
+        }
+    );
+  }
+  
+  private List<InputRow> buildInputRowsForMap(Map<String, Object> headerKeyList)
   {
-    final InputRow row = new MapBasedInputRow(
-        inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
-        getFinalDimensionList(headerKeyList.keySet()),
-        headerKeyList
+    return Collections.singletonList(
+        new MapBasedInputRow(
+            inputRowSchema.getTimestampSpec().extractTimestamp(headerKeyList),
+            getFinalDimensionList(headerKeyList.keySet()),
+            headerKeyList
+        )
     );
-    final List<InputRow> rows = Collections.singletonList(row);
-    return CloseableIterators.withEmptyBaggage(rows.iterator());
   }
 
   /**
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 47327dbeee..242e1fa4a1 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -34,6 +34,7 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
 import org.apache.druid.data.input.impl.StringInputRowParser;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
 import org.apache.druid.indexing.kafka.test.TestBroker;
@@ -94,6 +95,26 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
       null
   );
 
+  private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP = new DataSchema(
+      "test_ds",
+      new TimestampSpec("kafka.timestamp", "iso", null),
+      new DimensionsSpec(
+          Arrays.asList(
+              new StringDimensionSchema("dim1"),
+              new StringDimensionSchema("dim1t"),
+              new StringDimensionSchema("dim2"),
+              new LongDimensionSchema("dimLong"),
+              new FloatDimensionSchema("dimFloat")
+          )
+      ),
+      new AggregatorFactory[]{
+          new DoubleSumAggregatorFactory("met1sum", "met1"),
+          new CountAggregatorFactory("rows")
+      },
+      new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+      null
+  );
+
   private static TestingCluster zkServer;
   private static TestBroker kafkaServer;
 
@@ -126,7 +147,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
     zkServer.stop();
   }
 
-  @Test(timeout = 30_000L)
+  @Test
   public void testSample()
   {
     insertData(generateRecords(TOPIC));
@@ -169,7 +190,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
-        new SamplerConfig(5, null, null, null),
+        new SamplerConfig(5, 5_000, null, null),
         new InputSourceSampler(OBJECT_MAPPER),
         OBJECT_MAPPER
     );
@@ -177,6 +198,90 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
     runSamplerAndCompareResponse(samplerSpec, true);
   }
 
+  @Test
+  public void testSampleKafkaInputFormat()
+  {
+    insertData(generateRecords(TOPIC));
+
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+        null,
+        DATA_SCHEMA_KAFKA_TIMESTAMP,
+        null,
+        new KafkaSupervisorIOConfig(
+            TOPIC,
+            new KafkaInputFormat(
+                null,
+                null,
+                new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
+                null,
+                null,
+                null
+            ),
+
+            null,
+            null,
+            null,
+            kafkaServer.consumerProperties(),
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, 5_000, null, null),
+        new InputSourceSampler(OBJECT_MAPPER),
+        OBJECT_MAPPER
+    );
+
+    SamplerResponse response = samplerSpec.sample();
+
+    Assert.assertEquals(5, response.getNumRowsRead());
+    // we can parse an extra row compared to other generated data samples because we are using kafka timestamp
+    // for timestamp
+    Assert.assertEquals(4, response.getNumRowsIndexed());
+    Assert.assertEquals(5, response.getData().size());
+
+    Iterator<SamplerResponse.SamplerResponseRow> it = response.getData().iterator();
+
+    SamplerResponse.SamplerResponseRow nextRow;
+    Map<String, Object> rawInput;
+    Map<String, Object> parsedInput;
+
+    for (int i = 0; i < 4; i++) {
+      nextRow = it.next();
+      Assert.assertNull(nextRow.isUnparseable());
+      rawInput = nextRow.getInput();
+      parsedInput = nextRow.getParsed();
+      Assert.assertTrue(rawInput.containsKey("kafka.timestamp"));
+      Assert.assertEquals(rawInput.get("kafka.timestamp"), parsedInput.get("__time"));
+    }
+    nextRow = it.next();
+    Assert.assertTrue(nextRow.isUnparseable());
+
+    Assert.assertFalse(it.hasNext());
+  }
+
   @Test
   public void testWithInputRowParser() throws IOException
   {
@@ -245,7 +350,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
-        new SamplerConfig(5, null, null, null),
+        new SamplerConfig(5, 5_000, null, null),
         new InputSourceSampler(new DefaultObjectMapper()),
         OBJECT_MAPPER
     );


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org