You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2023/02/07 19:22:06 UTC

[kafka] branch trunk updated: MINOR: remove unncessary helper method (#13209)

This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 463bb00b11c MINOR: remove unncessary helper method (#13209)
463bb00b11c is described below

commit 463bb00b11c7137c35ef88bdda8c1cbe6a325273
Author: Matthias J. Sax <ma...@confluent.io>
AuthorDate: Tue Feb 7 11:21:58 2023 -0800

    MINOR: remove unncessary helper method (#13209)
    
    Reviewers: Christo Lolov (@clolov), Lucas Brutschy <lb...@confluent.io>, Ismael Juma <is...@confluent.io>
---
 .../kafka/streams/processor/internals/ProcessorContextUtils.java  | 8 --------
 .../internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java   | 2 +-
 .../state/internals/AbstractRocksDBSegmentedBytesStore.java       | 2 +-
 .../apache/kafka/streams/state/internals/InMemoryWindowStore.java | 2 +-
 4 files changed, 3 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
index c23c0fc9e49..c93297c97cc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextUtils.java
@@ -34,14 +34,6 @@ public final class ProcessorContextUtils {
 
     private ProcessorContextUtils() {}
 
-    /**
-     * Note that KIP-622 would move currentSystemTimeMs to ProcessorContext,
-     * removing the need for this method.
-     */
-    public static long currentSystemTime(final ProcessorContext context) {
-        return context.currentSystemTimeMs();
-    }
-
     /**
      * Should be removed as part of KAFKA-10217
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
index b446a52eb5f..441c17201b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
@@ -190,7 +190,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
         final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
 
         if (segment == null) {
-            expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+            expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
             LOG.warn("Skipping record for expired segment.");
         } else {
             StoreQueryUtils.updatePosition(position, stateStoreContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
index bcfe30b30e9..1cb0e107a31 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
@@ -262,7 +262,7 @@ public class AbstractRocksDBSegmentedBytesStore<S extends Segment> implements Se
         final long segmentId = segments.segmentId(timestamp);
         final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
         if (segment == null) {
-            expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+            expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
             LOG.warn("Skipping record for expired segment.");
         } else {
             StoreQueryUtils.updatePosition(position, stateStoreContext);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 5122789e31a..2ddeadc3585 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -159,7 +159,7 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
         observedStreamTime = Math.max(observedStreamTime, windowStartTimestamp);
 
         if (windowStartTimestamp <= observedStreamTime - retentionPeriod) {
-            expiredRecordSensor.record(1.0d, ProcessorContextUtils.currentSystemTime(context));
+            expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
             LOG.warn("Skipping record for expired segment.");
         } else {
             if (value != null) {