You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by GitBox <gi...@apache.org> on 2021/05/26 21:55:40 UTC

[GitHub] [drill] vvysotskyi opened a new pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

vvysotskyi opened a new pull request #2239:
URL: https://github.com/apache/drill/pull/2239


   # [DRILL-5940](https://issues.apache.org/jira/browse/DRILL-5940): Add support for Avro messages with schema registry for Kafka
   
   ## Description
   - Refactored `KafkaRecordReader` to use EVF.
   - Updated `JsonMessageReader` to use V2 JSON reader
   - Added `AvroMessageReader` for processing messages serialized as Avro records and added schema registry support.
   
   To be able to deserialize Kafka messages of Avro type, schema registry URL should be specified in the Kafka storage plugin `kafkaConsumerProps` (and other schema registry options if required):
   ``` 
   "schema.registry.url": "http://localhost:8081"
   ```
   After that, `store.kafka.record.reader` session option should be set to `org.apache.drill.exec.store.kafka.decoders.AvroMessageReader`.
   
   ## Documentation
   NA
   
   ## Testing
   Added UT that reads Avro messages.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2239:
URL: https://github.com/apache/drill/pull/2239#issuecomment-849782381


   @vvysotskyi  I approved this PR.  We should probably update the docs as well. @dzamo Can I tag you for that effort?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] luocooong commented on pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
luocooong commented on pull request #2239:
URL: https://github.com/apache/drill/pull/2239#issuecomment-850004683


   A user asked Support confluent questions yesterday, update the docs for better. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi merged pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
vvysotskyi merged pull request #2239:
URL: https://github.com/apache/drill/pull/2239


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] vvysotskyi commented on a change in pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
vvysotskyi commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640779940



##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
##########
@@ -47,6 +47,13 @@
    */
   public boolean skipMalformedRecords;
 
+  /**
+   * This property works only when {@link #skipMalformedRecords} enabled.
+   * If true, {@link TokenIterator.RecoverableJsonException} will be populated for the case of
+   * malformed empty document, so it will be possible to handle this exception by caller.
+   */
+  public boolean skipMalformedDocument;

Review comment:
       There is no option for this. It is for internal usage only.

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
##########
@@ -55,61 +47,78 @@
 public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
-  private JsonReader jsonReader;
-  private VectorContainerWriter writer;
-  private ObjectMapper objectMapper;
+
+  private final SingleElementIterator<InputStream> stream = new SingleElementIterator<>();
+
+  private KafkaJsonLoader kafkaJsonLoader;
+  private ResultSetLoader resultSetLoader;
+  private SchemaNegotiator negotiator;
+  private ReadOptions readOptions;
+  private Properties kafkaConsumerProps;
 
   @Override
-  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions) {
-    // set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files.
-    this.jsonReader = new JsonReader.Builder(buf)
-      .schemaPathColumns(columns)
-      .allTextMode(readOptions.isAllTextMode())
-      .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
-      .enableNanInf(readOptions.isAllowNanInf())
-      .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
-      .build();
-    jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
-    this.writer = writer;
-    this.objectMapper = BaseJsonProcessor.getDefaultMapper()
-      .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf())
-      .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar());
+  public void init(SchemaNegotiator negotiator, ReadOptions readOptions, KafkaStoragePlugin plugin) {
+    this.negotiator = negotiator;
+    this.resultSetLoader = negotiator.build();
+    this.readOptions = readOptions;
+    this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
   }
 
   @Override
-  public boolean readMessage(ConsumerRecord<?, ?> record) {
+  public void readMessage(ConsumerRecord<?, ?> record) {
     byte[] recordArray = (byte[]) record.value();
-    String data = new String(recordArray, Charsets.UTF_8);
     try {
-      JsonNode jsonNode = objectMapper.readTree(data);
-      if (jsonNode != null && jsonNode.isObject()) {
-        ObjectNode objectNode = (ObjectNode) jsonNode;
-        objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
-        objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
-        objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
-        objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
-        objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
-      } else {
-        throw new IOException("Unsupported node type: " + (jsonNode == null ? "NO CONTENT" : jsonNode.getNodeType()));
+      parseAndWrite(record, recordArray);
+    } catch (TokenIterator.RecoverableJsonException e) {
+      if (!readOptions.isSkipInvalidRecords()) {
+        throw e;

Review comment:
       Thanks, done

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();

Review comment:
       Yes, it works slightly differently, `rowWriter.start()` is called in `MessageReader.readMessage()` method after all checks below for batch size are passed.

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
       }
+    }
+    return messageReader.endBatch();
+  }
 
-      if (currentMessageCount > 0) {
-        messageReader.ensureAtLeastOneField();
-      }
-      writer.setValueCount(currentMessageCount);
-      if (watch != null) {
-        logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
-      }
-      logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
-          currentOffset);
-      return currentMessageCount;
-    } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Record reader was at record: " + (currentMessageCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .addContext(e.getMessage())
-        .build(logger);
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+      return false;
     }
+    ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+    currentOffset = consumerRecord.offset();
+    messageReader.readMessage(consumerRecord);
+    return true;
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     logger.debug("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         currentOffset);
     logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
     plugin.registerToClose(msgItr);
-    messageReader.close();
+    try {
+      messageReader.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+    }
   }
 
   @Override
   public String toString() {
     return "KafkaRecordReader[readOptions=" + readOptions

Review comment:
       Thanks, used it here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
cgivre commented on pull request #2239:
URL: https://github.com/apache/drill/pull/2239#issuecomment-849219386


   @vvysotskyi  Thanks for this PR!   Does this PR address the single topic issue with the Kafka Plugin?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640787712



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();

Review comment:
       Got it.  Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [drill] cgivre commented on a change in pull request #2239: DRILL-5940: Add support for Avro messages with schema registry for Kafka

Posted by GitBox <gi...@apache.org>.
cgivre commented on a change in pull request #2239:
URL: https://github.com/apache/drill/pull/2239#discussion_r640210763



##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/decoders/JsonMessageReader.java
##########
@@ -55,61 +47,78 @@
 public class JsonMessageReader implements MessageReader {
 
   private static final Logger logger = LoggerFactory.getLogger(JsonMessageReader.class);
-  private JsonReader jsonReader;
-  private VectorContainerWriter writer;
-  private ObjectMapper objectMapper;
+
+  private final SingleElementIterator<InputStream> stream = new SingleElementIterator<>();
+
+  private KafkaJsonLoader kafkaJsonLoader;
+  private ResultSetLoader resultSetLoader;
+  private SchemaNegotiator negotiator;
+  private ReadOptions readOptions;
+  private Properties kafkaConsumerProps;
 
   @Override
-  public void init(DrillBuf buf, List<SchemaPath> columns, VectorContainerWriter writer, ReadOptions readOptions) {
-    // set skipOuterList to false as it doesn't applicable for JSON records and it's only applicable for JSON files.
-    this.jsonReader = new JsonReader.Builder(buf)
-      .schemaPathColumns(columns)
-      .allTextMode(readOptions.isAllTextMode())
-      .readNumbersAsDouble(readOptions.isReadNumbersAsDouble())
-      .enableNanInf(readOptions.isAllowNanInf())
-      .enableEscapeAnyChar(readOptions.isAllowEscapeAnyChar())
-      .build();
-    jsonReader.setIgnoreJSONParseErrors(readOptions.isSkipInvalidRecords());
-    this.writer = writer;
-    this.objectMapper = BaseJsonProcessor.getDefaultMapper()
-      .configure(JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS, readOptions.isAllowNanInf())
-      .configure(JsonParser.Feature.ALLOW_BACKSLASH_ESCAPING_ANY_CHARACTER, readOptions.isAllowEscapeAnyChar());
+  public void init(SchemaNegotiator negotiator, ReadOptions readOptions, KafkaStoragePlugin plugin) {
+    this.negotiator = negotiator;
+    this.resultSetLoader = negotiator.build();
+    this.readOptions = readOptions;
+    this.kafkaConsumerProps = plugin.getConfig().getKafkaConsumerProps();
   }
 
   @Override
-  public boolean readMessage(ConsumerRecord<?, ?> record) {
+  public void readMessage(ConsumerRecord<?, ?> record) {
     byte[] recordArray = (byte[]) record.value();
-    String data = new String(recordArray, Charsets.UTF_8);
     try {
-      JsonNode jsonNode = objectMapper.readTree(data);
-      if (jsonNode != null && jsonNode.isObject()) {
-        ObjectNode objectNode = (ObjectNode) jsonNode;
-        objectNode.put(KAFKA_TOPIC.getFieldName(), record.topic());
-        objectNode.put(KAFKA_PARTITION_ID.getFieldName(), record.partition());
-        objectNode.put(KAFKA_OFFSET.getFieldName(), record.offset());
-        objectNode.put(KAFKA_TIMESTAMP.getFieldName(), record.timestamp());
-        objectNode.put(KAFKA_MSG_KEY.getFieldName(), record.key() != null ? record.key().toString() : null);
-      } else {
-        throw new IOException("Unsupported node type: " + (jsonNode == null ? "NO CONTENT" : jsonNode.getNodeType()));
+      parseAndWrite(record, recordArray);
+    } catch (TokenIterator.RecoverableJsonException e) {
+      if (!readOptions.isSkipInvalidRecords()) {
+        throw e;

Review comment:
       Do you think we should throw a `UserException` here with an explanation and `errorContext`?

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();

Review comment:
       @vvysotskyi 
   Does this line belong in the `setup()` method?  In the Splunk plugin for example:
   
   https://github.com/apache/drill/blob/bc53c8e6c4a24b7cbce2112690a7d3e77e55fe41/contrib/storage-splunk/src/main/java/org/apache/drill/exec/store/splunk/SplunkBatchReader.java#L122-L129
   
   Then in each iteration of the `next()` method it calls `rowWriter.start()`.  Or does this work a little differently?
   

##########
File path: exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/parser/JsonStructureOptions.java
##########
@@ -47,6 +47,13 @@
    */
   public boolean skipMalformedRecords;
 
+  /**
+   * This property works only when {@link #skipMalformedRecords} enabled.
+   * If true, {@link TokenIterator.RecoverableJsonException} will be populated for the case of
+   * malformed empty document, so it will be possible to handle this exception by caller.
+   */
+  public boolean skipMalformedDocument;

Review comment:
       Is there a system option for this?  Also is this documented anywhere?

##########
File path: contrib/storage-kafka/src/main/java/org/apache/drill/exec/store/kafka/KafkaRecordReader.java
##########
@@ -17,131 +17,105 @@
  */
 package org.apache.drill.exec.store.kafka;
 
-import java.io.IOException;
-import java.util.Collection;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.drill.common.exceptions.ChildErrorContext;
+import org.apache.drill.common.exceptions.CustomErrorContext;
 import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.kafka.decoders.MessageReader;
 import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactory;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.io.IOException;
 
-public class KafkaRecordReader extends AbstractRecordReader {
+public class KafkaRecordReader implements ManagedReader<SchemaNegotiator> {
   private static final Logger logger = LoggerFactory.getLogger(KafkaRecordReader.class);
 
-  private static final long DEFAULT_MESSAGES_PER_BATCH = 4000;
-
   private final ReadOptions readOptions;
   private final KafkaStoragePlugin plugin;
   private final KafkaPartitionScanSpec subScanSpec;
+  private final int maxRecords;
 
-  private VectorContainerWriter writer;
   private MessageReader messageReader;
-
   private long currentOffset;
   private MessageIterator msgItr;
-  private int currentMessageCount;
 
-  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, List<SchemaPath> projectedColumns,
-      FragmentContext context, KafkaStoragePlugin plugin) {
-    setColumns(projectedColumns);
-    this.readOptions = new ReadOptions(context.getOptions());
+  public KafkaRecordReader(KafkaPartitionScanSpec subScanSpec, OptionManager options, KafkaStoragePlugin plugin, int maxRecords) {
+    this.readOptions = new ReadOptions(options);
     this.plugin = plugin;
     this.subScanSpec = subScanSpec;
+    this.maxRecords = maxRecords;
   }
 
   @Override
-  protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projectedColumns) {
-    Set<SchemaPath> transformed = new LinkedHashSet<>();
-    if (isStarQuery()) {
-      transformed.add(SchemaPath.STAR_COLUMN);
-    } else {
-      transformed.addAll(projectedColumns);
-    }
-    return transformed;
-  }
+  public boolean open(SchemaNegotiator negotiator) {
+    CustomErrorContext errorContext = new ChildErrorContext(negotiator.parentErrorContext()) {
+      @Override
+      public void addContext(UserException.Builder builder) {
+        super.addContext(builder);
+        builder.addContext("topic_name", subScanSpec.getTopicName());
+      }
+    };
+    negotiator.setErrorContext(errorContext);
 
-  @Override
-  public void setup(OperatorContext context, OutputMutator output) {
-    this.writer = new VectorContainerWriter(output, readOptions.isEnableUnionType());
     messageReader = MessageReaderFactory.getMessageReader(readOptions.getMessageReader());
-    messageReader.init(context.getManagedBuffer(), Lists.newArrayList(getColumns()), writer, readOptions);
+    messageReader.init(negotiator, readOptions, plugin);
     msgItr = new MessageIterator(messageReader.getConsumer(plugin), subScanSpec, readOptions.getPollTimeOut());
+
+    return true;
   }
 
   /**
    * KafkaConsumer.poll will fetch 500 messages per poll call. So hasNext will
    * take care of polling multiple times for this given batch next invocation
    */
   @Override
-  public int next() {
-    writer.allocate();
-    writer.reset();
-    Stopwatch watch = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    currentMessageCount = 0;
-
-    try {
-      while (currentOffset < subScanSpec.getEndOffset() && msgItr.hasNext()) {
-        ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
-        currentOffset = consumerRecord.offset();
-        writer.setPosition(currentMessageCount);
-        boolean status = messageReader.readMessage(consumerRecord);
-        // increment record count only if message was read successfully
-        if (status) {
-          if (++currentMessageCount >= DEFAULT_MESSAGES_PER_BATCH) {
-            break;
-          }
-        }
+  public boolean next() {
+    RowSetLoader rowWriter = messageReader.getResultSetLoader().writer();
+    while (!rowWriter.isFull()) {
+      if (!nextLine(rowWriter)) {
+        return false;
       }
+    }
+    return messageReader.endBatch();
+  }
 
-      if (currentMessageCount > 0) {
-        messageReader.ensureAtLeastOneField();
-      }
-      writer.setValueCount(currentMessageCount);
-      if (watch != null) {
-        logger.debug("Took {} ms to process {} records.", watch.elapsed(TimeUnit.MILLISECONDS), currentMessageCount);
-      }
-      logger.debug("Last offset consumed for {}:{} is {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
-          currentOffset);
-      return currentMessageCount;
-    } catch (Exception e) {
-      String msg = "Failure while reading messages from kafka. Record reader was at record: " + (currentMessageCount + 1);
-      throw UserException.dataReadError(e)
-        .message(msg)
-        .addContext(e.getMessage())
-        .build(logger);
+  private boolean nextLine(RowSetLoader rowWriter) {
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    if (currentOffset >= subScanSpec.getEndOffset() || !msgItr.hasNext()) {
+      return false;
     }
+    ConsumerRecord<byte[], byte[]> consumerRecord = msgItr.next();
+    currentOffset = consumerRecord.offset();
+    messageReader.readMessage(consumerRecord);
+    return true;
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     logger.debug("Last offset processed for {}:{} is - {}", subScanSpec.getTopicName(), subScanSpec.getPartitionId(),
         currentOffset);
     logger.debug("Total time to fetch messages from {}:{} is - {} milliseconds", subScanSpec.getTopicName(),
         subScanSpec.getPartitionId(), msgItr.getTotalFetchTime());
     plugin.registerToClose(msgItr);
-    messageReader.close();
+    try {
+      messageReader.close();
+    } catch (IOException e) {
+      logger.warn("Error closing Kafka message reader: {}", e.getMessage(), e);
+    }
   }
 
   @Override
   public String toString() {
     return "KafkaRecordReader[readOptions=" + readOptions

Review comment:
       Should we update this to use the `PlanStringBuilder`?  Here and elsewhere?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org