You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/07 23:31:25 UTC

kafka git commit: KAFKA-4851: only search available segments during Segments.segments(from, to)

Repository: kafka
Updated Branches:
  refs/heads/trunk 81f9e1376 -> 146edd530


KAFKA-4851: only search available segments during Segments.segments(from, to)

restrict the locating of segments in `Segments#segments(..)` to only the segments that are currently available, i.e., rather than searching the hashmap for many segments that don't exist.

Author: Damian Guy <da...@gmail.com>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #2645 from dguy/session-windows-testing


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/146edd53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/146edd53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/146edd53

Branch: refs/heads/trunk
Commit: 146edd530a6dfa534fd8cb924fd2336e27872c2a
Parents: 81f9e13
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 7 15:31:22 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 7 15:31:22 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/state/internals/Segments.java | 22 +++++++++++++-------
 1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/146edd53/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 41cef54..5dedb40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -41,8 +41,8 @@ class Segments {
     private final int numSegments;
     private final long segmentInterval;
     private final SimpleDateFormat formatter;
-
-    private long currentSegmentId = -1L;
+    private long minSegmentId = Long.MAX_VALUE;
+    private long maxSegmentId = -1L;
 
     Segments(final String name, final long retentionPeriod, final int numSegments) {
         this.name = name;
@@ -66,7 +66,7 @@ class Segments {
     }
 
     Segment getOrCreateSegment(final long segmentId, final ProcessorContext context) {
-        if (segmentId > currentSegmentId || segmentId > currentSegmentId - numSegments) {
+        if (segmentId > maxSegmentId - numSegments) {
             final long key = segmentId % numSegments;
             final Segment segment = segments.get(key);
             if (!isSegment(segment, segmentId)) {
@@ -76,7 +76,10 @@ class Segments {
                 Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
                 newSegment.openDB(context);
                 segments.put(key, newSegment);
-                currentSegmentId = segmentId > currentSegmentId ? segmentId : currentSegmentId;
+                maxSegmentId = segmentId > maxSegmentId ? segmentId : maxSegmentId;
+                if (minSegmentId == Long.MAX_VALUE) {
+                    minSegmentId = maxSegmentId;
+                }
             }
             return segments.get(key);
         } else {
@@ -113,8 +116,8 @@ class Segments {
     }
 
     List<Segment> segments(final long timeFrom, final long timeTo) {
-        final long segFrom = segmentId(Math.max(0L, timeFrom));
-        final long segTo = segmentId(Math.min(currentSegmentId * segmentInterval, Math.max(0, timeTo)));
+        final long segFrom = Math.max(minSegmentId, segmentId(Math.max(0L, timeFrom)));
+        final long segTo = Math.min(maxSegmentId, segmentId(Math.min(maxSegmentId * segmentInterval, Math.max(0, timeTo))));
 
         final List<Segment> segments = new ArrayList<>();
         for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
@@ -155,9 +158,9 @@ class Segments {
     }
 
     private void cleanup(final long segmentId) {
-        final long oldestSegmentId = currentSegmentId < segmentId
+        final long oldestSegmentId = maxSegmentId < segmentId
                 ? segmentId - numSegments
-                : currentSegmentId - numSegments;
+                : maxSegmentId - numSegments;
 
         for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
             final Segment segment = segmentEntry.getValue();
@@ -167,6 +170,9 @@ class Segments {
                 segment.destroy();
             }
         }
+        if (oldestSegmentId > minSegmentId) {
+            minSegmentId = oldestSegmentId + 1;
+        }
     }
 
     private long segmentIdFromSegmentName(String segmentName) {