You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/07 19:22:06 UTC
[kafka] branch trunk updated: MINOR: remove unncessary helper method (#13209)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 463bb00b11c MINOR: remove unncessary helper method (#13209)
463bb00b11c is described below
commit 463bb00b11c7137c35ef88bdda8c1cbe6a325273
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Feb 7 11:21:58 2023 -0800
MINOR: remove unncessary helper method (#13209)
Reviewers: Christo Lolov (@clolov), Lucas Brutschy <lb...@confluent.io>, Ismael Juma <is...@confluent.io>
---
.../kafka/streams/processor/internals/ProcessorContextUtils.java | 8 --------
.../internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java | 2 +-
.../state/internals/AbstractRocksDBSegmentedBytesStore.java | 2 +-
.../apache/kafka/streams/state/internals/InMemoryWindowStore.java | 2 +-
4 files changed, 3 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index c23c0fc9e49..c93297c97cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -34,14 +34,6 @@ public final class ProcessorContextUtils {
private ProcessorContextUtils() {}
- /**
- * Note that KIP-622 would move currentSystemTimeMs to ProcessorContext,
- * removing the need for this method.
- */
- public static long currentSystemTime(final ProcessorContext context) {
- return context.currentSystemTimeMs();
- }
-
/**
* Should be removed as part of KAFKA-10217
*/
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index b446a52eb5f..441c17201b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -190,7 +190,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment == null) {
- expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+ expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
LOG.warn("Skipping record for expired segment.");
} else {
StoreQueryUtils.updatePosition(position, stateStoreContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index bcfe30b30e9..1cb0e107a31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -262,7 +262,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
if (segment == null) {
- expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+ expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
LOG.warn("Skipping record for expired segment.");
} else {
StoreQueryUtils.updatePosition(position, stateStoreContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 5122789e31a..2ddeadc3585 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -159,7 +159,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
- expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+ expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
LOG.warn("Skipping record for expired segment.");
} else {
if (value != null) {