You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/09/25 22:30:49 UTC

[GitHub] [pinot] mcvsubbu commented on a diff in pull request #9224: Extract record keys, headers and metadata from Stream sources

mcvsubbu commented on code in PR #9224:
URL: https://github.com/apache/pinot/pull/9224#discussion_r979468325


##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDataDecoderResult.java:
##########
@@ -16,27 +16,32 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.plugin.stream.kafka20;
+package org.apache.pinot.spi.stream;
 
-import java.nio.ByteBuffer;
-import org.apache.pinot.plugin.stream.kafka.MessageAndOffset;
-import org.apache.pinot.spi.stream.RowMetadata;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.data.readers.GenericRow;
 
 
-public class MessageAndOffsetAndMetadata extends MessageAndOffset {
-  private final RowMetadata _rowMetadata;
+/**
+ * A container class for holding the result of a decoder
+ * At any point in time, only one of Result or exception is set as null.
+ */
+public final class StreamDataDecoderResult {
+  private final GenericRow _result;
+  private final Exception _exception;

Review Comment:
   You have an exception declared here. I see that the decode() operation is guaranteed not to throw an exception. You may also want to add a comment on the decode() interface that it is NOT supposed to throw an exception (just to guard against someone changing it).
   
   Also, instead of a generic exception, I suggest that we define some specific ones (or at least specific categories) -- retryable or not. See PR #9051 
   



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -39,5 +44,27 @@ public interface RowMetadata {
    * @return timestamp (epoch in milliseconds) when the row was ingested upstream
    *         Long.MIN_VALUE if not available
    */
-  long getIngestionTimeMs();
+  long getRecordTimestampMs();

Review Comment:
   What is wrong with `getIngestionTimeMs()`? I think it conveys the semantics perfectly well instead of `getRecordTimeStampMs()`



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -544,16 +547,16 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
 
       // Index each message
       reuse.clear();
-      // retrieve metadata from the message batch if available
-      // this can be overridden by the decoder if there is a better indicator in the message payload
-      RowMetadata msgMetadata = messagesAndOffsets.getMetadataAtIndex(index);
-
-      GenericRow decodedRow = _messageDecoder
-          .decode(messagesAndOffsets.getMessageAtIndex(index), messagesAndOffsets.getMessageOffsetAtIndex(index),
-              messagesAndOffsets.getMessageLengthAtIndex(index), reuse);
-      if (decodedRow != null) {
+      StreamDataDecoderResult decodedRow = _streamDataDecoder.decode(messagesAndOffsets.getStreamMessage(index));
+      RowMetadata msgMetadata = messagesAndOffsets.getStreamMessage(index).getMetadata();
+      if (decodedRow.getException() != null) {
+        // TODO: handle exception as we do today - do we silently drop the record or throw exception?

Review Comment:
   This area is a little wild. There were a couple of PRs before as well, that kind of made it even more messy (someone needed decoder exceptions to retry to fetch new schema). It will be great if you can add Javadocs as appropriate in the decoder interfaces that clearly indicates how the exceptions will be treated by this class.
   
   Thanks



##########
pinot-spi/src/main/java/org/apache/pinot/spi/stream/RowMetadata.java:
##########
@@ -31,6 +32,8 @@
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface RowMetadata {
+  GenericRow EMPTY_ROW = new GenericRow();

Review Comment:
   +1, we should find ways of removing rowmetadata with the introduction of more generic class. Can we mark it deprecated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org