You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/03/07 23:31:25 UTC
kafka git commit: KAFKA-4851: only search available segments during
Segments.segments(from, to)
Repository: kafka
Updated Branches:
refs/heads/trunk 81f9e1376 -> 146edd530
KAFKA-4851: only search available segments during Segments.segments(from, to)
restrict the locating of segments in `Segments#segments(..)` to only the segments that are currently available, i.e., rather than searching the hashmap for many segments that don't exist.
Author: Damian Guy <da...@gmail.com>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #2645 from dguy/session-windows-testing
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/146edd53
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/146edd53
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/146edd53
Branch: refs/heads/trunk
Commit: 146edd530a6dfa534fd8cb924fd2336e27872c2a
Parents: 81f9e13
Author: Damian Guy <da...@gmail.com>
Authored: Tue Mar 7 15:31:22 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Mar 7 15:31:22 2017 -0800
----------------------------------------------------------------------
.../kafka/streams/state/internals/Segments.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/146edd53/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
index 41cef54..5dedb40 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java
@@ -41,8 +41,8 @@ class Segments {
private final int numSegments;
private final long segmentInterval;
private final SimpleDateFormat formatter;
-
- private long currentSegmentId = -1L;
+ private long minSegmentId = Long.MAX_VALUE;
+ private long maxSegmentId = -1L;
Segments(final String name, final long retentionPeriod, final int numSegments) {
this.name = name;
@@ -66,7 +66,7 @@ class Segments {
}
Segment getOrCreateSegment(final long segmentId, final ProcessorContext context) {
- if (segmentId > currentSegmentId || segmentId > currentSegmentId - numSegments) {
+ if (segmentId > maxSegmentId - numSegments) {
final long key = segmentId % numSegments;
final Segment segment = segments.get(key);
if (!isSegment(segment, segmentId)) {
@@ -76,7 +76,10 @@ class Segments {
Segment newSegment = new Segment(segmentName(segmentId), name, segmentId);
newSegment.openDB(context);
segments.put(key, newSegment);
- currentSegmentId = segmentId > currentSegmentId ? segmentId : currentSegmentId;
+ maxSegmentId = segmentId > maxSegmentId ? segmentId : maxSegmentId;
+ if (minSegmentId == Long.MAX_VALUE) {
+ minSegmentId = maxSegmentId;
+ }
}
return segments.get(key);
} else {
@@ -113,8 +116,8 @@ class Segments {
}
List<Segment> segments(final long timeFrom, final long timeTo) {
- final long segFrom = segmentId(Math.max(0L, timeFrom));
- final long segTo = segmentId(Math.min(currentSegmentId * segmentInterval, Math.max(0, timeTo)));
+ final long segFrom = Math.max(minSegmentId, segmentId(Math.max(0L, timeFrom)));
+ final long segTo = Math.min(maxSegmentId, segmentId(Math.min(maxSegmentId * segmentInterval, Math.max(0, timeTo))));
final List<Segment> segments = new ArrayList<>();
for (long segmentId = segFrom; segmentId <= segTo; segmentId++) {
@@ -155,9 +158,9 @@ class Segments {
}
private void cleanup(final long segmentId) {
- final long oldestSegmentId = currentSegmentId < segmentId
+ final long oldestSegmentId = maxSegmentId < segmentId
? segmentId - numSegments
- : currentSegmentId - numSegments;
+ : maxSegmentId - numSegments;
for (Map.Entry<Long, Segment> segmentEntry : segments.entrySet()) {
final Segment segment = segmentEntry.getValue();
@@ -167,6 +170,9 @@ class Segments {
segment.destroy();
}
}
+ if (oldestSegmentId > minSegmentId) {
+ minSegmentId = oldestSegmentId + 1;
+ }
}
private long segmentIdFromSegmentName(String segmentName) {