You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lz...@apache.org on 2017/07/06 16:59:25 UTC

[1/2] beam git commit: [BEAM-2534] Handle offset gaps in Kafka messages.

Repository: beam
Updated Branches:
  refs/heads/master bf6dda320 -> 85a99e294


[BEAM-2534] Handle offset gaps in Kafka messages.

KafkaIO logged a warning when there is a gap in offstes for messages.
Kafka also support 'KV' store style topics where some of the messages
are deleted leading gaps in offsets. This PR removes the log and
accounts for offset gaps in backlog estimate.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2259c309
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2259c309
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2259c309

Branch: refs/heads/master
Commit: 2259c309c5b81a5d1e32732dd35e1102766401fa
Parents: bf6dda3
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Jun 28 12:07:06 2017 -0700
Committer: JingsongLi <lz...@aliyun.com>
Committed: Fri Jul 7 00:44:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2259c309/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 702bdd3..e520367 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -904,6 +904,22 @@ public class KafkaIO {
       return name;
     }
 
+    // Maintains approximate average over last 1000 elements
+    private static class MovingAvg {
+      private static final int MOVING_AVG_WINDOW = 1000;
+      private double avg = 0;
+      private long numUpdates = 0;
+
+      void update(double quantity) {
+        numUpdates++;
+        avg += (quantity - avg) / Math.min(MOVING_AVG_WINDOW, numUpdates);
+      }
+
+      double get() {
+        return avg;
+      }
+    }
+
     // maintains state of each assigned partition (buffered records, consumed offset, etc)
     private static class PartitionState {
       private final TopicPartition topicPartition;
@@ -911,9 +927,8 @@ public class KafkaIO {
       private long latestOffset;
       private Iterator<ConsumerRecord<byte[], byte[]>> recordIter = Collections.emptyIterator();
 
-      // simple moving average for size of each record in bytes
-      private double avgRecordSize = 0;
-      private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements
+      private MovingAvg avgRecordSize = new MovingAvg();
+      private MovingAvg avgOffsetGap = new MovingAvg(); // > 0 only when log compaction is enabled.
 
       PartitionState(TopicPartition partition, long nextOffset) {
         this.topicPartition = partition;
@@ -921,17 +936,13 @@ public class KafkaIO {
         this.latestOffset = UNINITIALIZED_OFFSET;
       }
 
-      // update consumedOffset and avgRecordSize
-      void recordConsumed(long offset, int size) {
+      // Update consumedOffset, avgRecordSize, and avgOffsetGap
+      void recordConsumed(long offset, int size, long offsetGap) {
         nextOffset = offset + 1;
 
-        // this is always updated from single thread. probably not worth making it an AtomicDouble
-        if (avgRecordSize <= 0) {
-          avgRecordSize = size;
-        } else {
-          // initially, first record heavily contributes to average.
-          avgRecordSize += ((size - avgRecordSize) / movingAvgWindow);
-        }
+        // This is always updated from single thread. Probably not worth making atomic.
+        avgRecordSize.update(size);
+        avgOffsetGap.update(offsetGap);
       }
 
       synchronized void setLatestOffset(long latestOffset) {
@@ -944,14 +955,15 @@ public class KafkaIO {
         if (backlogMessageCount == UnboundedReader.BACKLOG_UNKNOWN) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return (long) (backlogMessageCount * avgRecordSize);
+        return (long) (backlogMessageCount * avgRecordSize.get());
       }
 
       synchronized long backlogMessageCount() {
         if (latestOffset < 0 || nextOffset < 0) {
           return UnboundedReader.BACKLOG_UNKNOWN;
         }
-        return Math.max(0, (latestOffset - nextOffset));
+        double remaining = (latestOffset - nextOffset) / (1 + avgOffsetGap.get());
+        return Math.max(0, (long) Math.ceil(remaining));
       }
     }
 
@@ -1154,14 +1166,11 @@ public class KafkaIO {
             continue;
           }
 
-          // sanity check
-          if (offset != expected) {
-            LOG.warn("{}: gap in offsets for {} at {}. {} records missing.",
-                this, pState.topicPartition, expected, offset - expected);
-          }
+          long offsetGap = offset - expected; // could be > 0 when Kafka log compaction is enabled.
 
           if (curRecord == null) {
             LOG.info("{}: first record offset {}", name, offset);
+            offsetGap = 0;
           }
 
           curRecord = null; // user coders below might throw.
@@ -1182,7 +1191,7 @@ public class KafkaIO {
 
           int recordSize = (rawRecord.key() == null ? 0 : rawRecord.key().length)
               + (rawRecord.value() == null ? 0 : rawRecord.value().length);
-          pState.recordConsumed(offset, recordSize);
+          pState.recordConsumed(offset, recordSize, offsetGap);
           bytesRead.inc(recordSize);
           bytesReadBySplit.inc(recordSize);
           return true;


[2/2] beam git commit: This closes #3461

Posted by lz...@apache.org.
This closes #3461


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/85a99e29
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/85a99e29
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/85a99e29

Branch: refs/heads/master
Commit: 85a99e29448670ae6728a8ee2e4cd3ef95877c3e
Parents: bf6dda3 2259c30
Author: JingsongLi <lz...@aliyun.com>
Authored: Fri Jul 7 00:48:36 2017 +0800
Committer: JingsongLi <lz...@aliyun.com>
Committed: Fri Jul 7 00:48:36 2017 +0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------