You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/13 14:20:53 UTC

[incubator-druid] branch master updated: Fix log level and throw NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)

This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8732329  Fix log level and throw NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)
8732329 is described below

commit 873232954fdba3490bd8c723a6bc157d0e256c1c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Mar 13 07:20:43 2019 -0700

    Fix log level and throw NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)
---
 .../SeekableStreamIndexTaskRunner.java             | 35 ++++++++++++----------
 1 file changed, 19 insertions(+), 16 deletions(-)

diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 7eee9dc..367b201 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1887,22 +1887,25 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
   {
     // Check only for the first record among the record batch.
     if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
-      final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId());
-      if (currOffset != null) {
-        final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
-            record.getSequenceNumber()
-        );
-        final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
-            currOffset
+      final SequenceOffsetType currOffset = Preconditions.checkNotNull(
+          currOffsets.get(record.getPartitionId()),
+          "Current offset is null for sequenceNumber[%s] and partitionId[%s]",
+          record.getSequenceNumber(),
+          record.getPartitionId()
+      );
+      final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
+          record.getSequenceNumber()
+      );
+      final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
+          currOffset
+      );
+      if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
+        throw new ISE(
+            "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]",
+            record.getSequenceNumber(),
+            currOffset,
+            record.getPartitionId()
         );
-        if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
-          throw new ISE(
-              "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]",
-              record.getSequenceNumber(),
-              currOffset,
-              record.getPartitionId()
-          );
-        }
       }
 
       // Remove the mark to notify that this partition has been read.
@@ -1910,7 +1913,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
 
       // check exclusive starting sequence
       if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) {
-        log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
+        log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId());
 
         return false;
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org