You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by vv...@apache.org on 2021/05/21 21:03:00 UTC

[kafka] 05/05: fix tests

This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch poc-478-ktable-1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b4f09abdec9ebc29ae028cce25b977ac0ed87061
Author: John Roesler <vv...@apache.org>
AuthorDate: Fri May 21 16:01:56 2021 -0500

    fix tests
---
 .../java/org/apache/kafka/streams/kstream/internals/KTableFilter.java   | 2 ++
 .../java/org/apache/kafka/streams/kstream/internals/KTableImpl.java     | 2 +-
 .../kafka/streams/kstream/internals/SessionCacheFlushListener.java      | 2 +-
 .../kafka/streams/kstream/internals/TimestampedTupleForwarder.java      | 2 +-
 .../kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java  | 1 -
 .../streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java    | 2 +-
 6 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index ecbd4703..3d97408 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -167,6 +167,8 @@ class KTableFilter<KIn, VIn> implements KTableNewProcessorSupplier<KIn, VIn, KIn
 
         @Override
         public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
+            // This is the old processor context for compatibility with the other KTable processors.
+            // Once we migrte them all, we can swap this out.
             parentGetter.init(context);
         }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 01cd194..faee7af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -832,7 +832,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable<
             return new KTableSourceValueGetterSupplier<>(source.queryableName());
         } else if (processorSupplier instanceof KStreamAggProcessorSupplier) {
             return ((KStreamAggProcessorSupplier<?, K, S, V>) processorSupplier).view();
-        } else if (processorSupplier instanceof KTableNewProcessorSupplier){
+        } else if (processorSupplier instanceof KTableNewProcessorSupplier) {
             return ((KTableNewProcessorSupplier<?, ?, K, V>) processorSupplier).view();
         } else {
             return ((KTableProcessorSupplier<K, S, V>) processorSupplier).view();
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
index 2792dd9..daa7c64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
@@ -55,7 +55,7 @@ class SessionCacheFlushListener<KOut, VOut> implements CacheFlushListener<Window
         @SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode();
         context.setCurrentNode(myNode);
         try {
-            context.forward(record);
+            context.forward(record.withTimestamp(record.key().window().end()));
         } finally {
             context.setCurrentNode(prev);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
index e5733eb..6411b35 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarder.java
@@ -58,7 +58,7 @@ class TimestampedTupleForwarder<K, V> {
 
     public void maybeForward(final Record<K, Change<V>> record) {
         if (!cachingEnabled) {
-            if(sendOldValues) {
+            if (sendOldValues) {
                 context.forward(record);
             } else {
                 context.forward(record.withValue(new Change<>(record.value().newValue, null)));
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
index 8b9dccb..89b732e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/TimestampedTupleForwarderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
index 9e2532d..5f524fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
@@ -46,7 +46,7 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 
-@SuppressWarnings("ALL")
+@SuppressWarnings("rawtypes")
 public class ChangeLoggingKeyValueBytesStoreTest {
 
     private final MockRecordCollector collector = new MockRecordCollector();