You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/15 03:13:33 UTC

[GitHub] [pinot] npawar commented on a diff in pull request #9224: Extract record keys, headers and metadata from Stream sources

npawar commented on code in PR #9224:
URL: https://github.com/apache/pinot/pull/9224#discussion_r971435459


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageMetadata.java:
##########
@@ -18,26 +18,38 @@
  */
 package org.apache.pinot.spi.stream;
 
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
 /**
  * A class that provides metadata associated with the message of a stream, for e.g.,

Review Comment:
   this is a little confusing, since the design doc called out headers and metadata separately, but here we're creating a StreamMessageMetadata object that contains metadata and headers. We should at least update the javadoc if we're going with putting these 2 together in this class



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -31,6 +32,8 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
+  GenericRow EMPTY_ROW = new GenericRow();

Review Comment:
   is RowMetadata interface needed? There doesnt seem to be much value add between RowMetadata and StreamMessageMetadata, and it's a bit confusing to have both. RowMetadata is also under stream package, so it cannot really be the metadata for anything else



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StreamDataDecoderImpl implements StreamDataDecoder {
+  private static final String KEY = "__key";
+  private static final String HEADER_KEY_PREFIX = "header$";
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamDataDecoderImpl.class);
+
+  private final StreamMessageDecoder _valueDecoder;
+
+  // Maybe simplify by allowing only string keys ?
+  public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) {
+    _valueDecoder = valueDecoder;
+  }
+
+  @Override
+  public StreamDataDecoderResult decode(StreamMessage message) {
+    assert message.getValue() != null;
+
+    try {
+      GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, new GenericRow());

Review Comment:
   we used to use the variant where we could pass in a reuse GenericRow, can we keep that?



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.java:
##########
@@ -43,30 +45,36 @@ public KafkaPartitionLevelConsumer(String clientId, StreamConfig streamConfig, i
   }
 
   @Override
-  public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
+  public MessageBatch<KafkaStreamMessage> fetchMessages(StreamPartitionMsgOffset startMsgOffset,
       StreamPartitionMsgOffset endMsgOffset, int timeoutMillis) {
     final long startOffset = ((LongMsgOffset) startMsgOffset).getOffset();
     final long endOffset = endMsgOffset == null ? Long.MAX_VALUE : ((LongMsgOffset) endMsgOffset).getOffset();
     return fetchMessages(startOffset, endOffset, timeoutMillis);
   }
 
-  public MessageBatch<byte[]> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
+  public MessageBatch<KafkaStreamMessage> fetchMessages(long startOffset, long endOffset, int timeoutMillis) {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", _topicPartition, startOffset,
           endOffset, timeoutMillis);
     }
     _consumer.seek(_topicPartition, startOffset);
     ConsumerRecords<String, Bytes> consumerRecords = _consumer.poll(Duration.ofMillis(timeoutMillis));
     List<ConsumerRecord<String, Bytes>> messageAndOffsets = consumerRecords.records(_topicPartition);
-    List<MessageAndOffsetAndMetadata> filtered = new ArrayList<>(messageAndOffsets.size());
+    List<KafkaStreamMessage> filtered = new ArrayList<>(messageAndOffsets.size());
     long lastOffset = startOffset;
     for (ConsumerRecord<String, Bytes> messageAndOffset : messageAndOffsets) {
+      String key = messageAndOffset.key();
+      byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8);
       Bytes message = messageAndOffset.value();
       long offset = messageAndOffset.offset();
       if (offset >= startOffset & (endOffset > offset | endOffset == -1)) {
         if (message != null) {
+          StreamMessageMetadata rowMetadata = null;
+          if (_config.isPopulateMetadata()) {
+            rowMetadata = (StreamMessageMetadata) _rowMetadataExtractor.extract(messageAndOffset);
+          }
           filtered.add(
-              new MessageAndOffsetAndMetadata(message.get(), offset, _rowMetadataExtractor.extract(messageAndOffset)));
+              new KafkaStreamMessage(keyBytes, message.get(), offset, rowMetadata));

Review Comment:
   why do we treat offset as special case, instead of within the rowMetadataExtractor?



##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMessage.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kafka20;
+
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.stream.StreamMessage;
+import org.apache.pinot.spi.stream.StreamMessageMetadata;
+
+
+public class KafkaStreamMessage extends StreamMessage {
+  private static final GenericRow EMPTY_ROW_REUSE = new GenericRow();
+
+  // should distinguish stream-specific record metadata in the table??
+  private final long _offset;

Review Comment:
   wouldn't this be part of StreamMessageMetadata within the KafkaStreamMessage?



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java:
##########
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * A container class for holding the result of a decoder
+ * At any point in time, only one of Result or exception is set as null.
+ */
+public final class StreamDataDecoderResult {
+  private final GenericRow _result;
+  private final Exception _exception;
+
+  public StreamDataDecoderResult(GenericRow result, Exception exception) {
+    _result = result;
+    _exception = exception;
+  }
+
+  public GenericRow getResult() {

Review Comment:
   Mark them both as Nullable



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderImpl.java:
##########
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.Map;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class StreamDataDecoderImpl implements StreamDataDecoder {
+  private static final String KEY = "__key";
+  private static final String HEADER_KEY_PREFIX = "header$";
+  private static final Logger LOGGER = LoggerFactory.getLogger(StreamDataDecoderImpl.class);
+
+  private final StreamMessageDecoder _valueDecoder;
+
+  // Maybe simplify by allowing only string keys ?
+  public StreamDataDecoderImpl(StreamMessageDecoder valueDecoder) {
+    _valueDecoder = valueDecoder;
+  }
+
+  @Override
+  public StreamDataDecoderResult decode(StreamMessage message) {
+    assert message.getValue() != null;
+
+    try {
+      GenericRow row = _valueDecoder.decode(message.getValue(), 0, message.getValue().length, new GenericRow());
+      if (row != null) {
+        if (message.getKey() != null) {
+          row.putValue(KEY, message.getKey());
+        }
+        RowMetadata metadata = message.getMetadata();

Review Comment:
   won't thsi be the right place to also extract and add the other metadata fields? seems we only do key, value, headers here



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java:
##########
@@ -76,6 +76,8 @@ public void setUp()
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
+
+    Thread.currentThread().join();

Review Comment:
   remove before merging?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org