You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by gi...@apache.org on 2019/03/13 14:20:53 UTC
[incubator-druid] branch master updated: Fix log level and throw
NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)
This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new 8732329 Fix log level and throw NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)
8732329 is described below
commit 873232954fdba3490bd8c723a6bc157d0e256c1c
Author: Jihoon Son <ji...@apache.org>
AuthorDate: Wed Mar 13 07:20:43 2019 -0700
Fix log level and throw NPE on null currOffset in SeekableStreamIndexTaskRunner (#7253)
---
.../SeekableStreamIndexTaskRunner.java | 35 ++++++++++++----------
1 file changed, 19 insertions(+), 16 deletions(-)
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 7eee9dc..367b201 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -1887,22 +1887,25 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
{
// Check only for the first record among the record batch.
if (initialOffsetsSnapshot.contains(record.getPartitionId())) {
- final SequenceOffsetType currOffset = currOffsets.get(record.getPartitionId());
- if (currOffset != null) {
- final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
- record.getSequenceNumber()
- );
- final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
- currOffset
+ final SequenceOffsetType currOffset = Preconditions.checkNotNull(
+ currOffsets.get(record.getPartitionId()),
+ "Current offset is null for sequenceNumber[%s] and partitionId[%s]",
+ record.getSequenceNumber(),
+ record.getPartitionId()
+ );
+ final OrderedSequenceNumber<SequenceOffsetType> recordSequenceNumber = createSequenceNumber(
+ record.getSequenceNumber()
+ );
+ final OrderedSequenceNumber<SequenceOffsetType> currentSequenceNumber = createSequenceNumber(
+ currOffset
+ );
+ if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
+ throw new ISE(
+ "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition[%s]",
+ record.getSequenceNumber(),
+ currOffset,
+ record.getPartitionId()
);
- if (recordSequenceNumber.compareTo(currentSequenceNumber) < 0) {
- throw new ISE(
- "sequenceNumber of the start record[%s] is smaller than current sequenceNumber[%s] for partition [%s]",
- record.getSequenceNumber(),
- currOffset,
- record.getPartitionId()
- );
- }
}
// Remove the mark to notify that this partition has been read.
@@ -1910,7 +1913,7 @@ public abstract class SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
// check exclusive starting sequence
if (isStartingSequenceOffsetsExclusive() && exclusiveStartingPartitions.contains(record.getPartitionId())) {
- log.warn("Skipping starting sequenceNumber for partition [%s] marked exclusive", record.getPartitionId());
+ log.info("Skipping starting sequenceNumber for partition[%s] marked exclusive", record.getPartitionId());
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org