You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mj...@apache.org on 2021/04/09 18:50:41 UTC
[kafka] branch trunk updated: KAFKA-12449: Remove deprecated
WindowStore#put (#10293)
This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new db0323e KAFKA-12449: Remove deprecated WindowStore#put (#10293)
db0323e is described below
commit db0323e9ba3f767614415d833a2081a33825a284
Author: Jorge Esteban Quilcate Otoya <qu...@gmail.com>
AuthorDate: Fri Apr 9 19:49:37 2021 +0100
KAFKA-12449: Remove deprecated WindowStore#put (#10293)
Removes `WindowStore#put(K,V)` that was deprecated via KIP-474.
Reviewers: Matthias J. Sax <ma...@confluent.io>
---
docs/streams/upgrade-guide.html | 1 +
.../internals/AbstractReadOnlyDecorator.java | 7 --
.../internals/AbstractReadWriteDecorator.java | 7 --
.../apache/kafka/streams/state/WindowStore.java | 18 -----
.../state/internals/CachingWindowStore.java | 6 --
.../internals/ChangeLoggingWindowBytesStore.java | 10 ---
.../state/internals/InMemoryWindowStore.java | 6 --
.../state/internals/MeteredWindowStore.java | 7 --
.../state/internals/RocksDBWindowStore.java | 24 ------
.../internals/TimestampedWindowStoreBuilder.java | 7 --
.../WindowToTimestampedWindowByteStoreAdapter.java | 7 --
.../internals/ProcessorContextImplTest.java | 10 +--
.../internals/AbstractWindowBytesStoreTest.java | 93 +++++++---------------
.../CachingPersistentWindowStoreTest.java | 88 ++++++++++----------
...angeLoggingTimestampedWindowBytesStoreTest.java | 6 +-
.../ChangeLoggingWindowBytesStoreTest.java | 8 +-
.../state/internals/InMemoryWindowStoreTest.java | 19 ++---
.../MeteredTimestampedWindowStoreTest.java | 6 +-
.../state/internals/MeteredWindowStoreTest.java | 3 +-
.../state/internals/RocksDBWindowStoreTest.java | 84 +++++++------------
.../apache/kafka/streams/TopologyTestDriver.java | 7 --
.../kafka/streams/WindowStoreFacadeTest.java | 11 ---
22 files changed, 119 insertions(+), 316 deletions(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 073badd..b082035 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -111,6 +111,7 @@
<li> Overloaded <code>KStream#join, leftJoin, outerJoin</code> with <code>KStream</code> and <code>Joined</code> parameters: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-479%3A+Add+StreamJoined+config+object+to+Join">KIP-479</a>).</li>
<li> Overloaded <code>KafkaStreams#metadataForKey</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-535%3A+Allow+state+stores+to+serve+stale+reads+during+rebalance">KIP-535</a>).</li>
<li> Overloaded <code>KafkaStreams#store</code>: deprecated in Kafka 2.5.0 (<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-562%3A+Allow+fetching+a+key+from+a+single+partition+rather+than+iterating+over+all+the+stores+on+an+instance">KIP-562</a>).</li>
+ <li> <code>WindowStore#put(K key, V value)</code>: deprecated in Kafka 2.4.0 (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545">KIP-474</a>).</li>
</ul>
<p>
The following dependencies were removed from Kafka Streams:
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
index 4424cdb..6c0c975 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadOnlyDecorator.java
@@ -160,13 +160,6 @@ abstract class AbstractReadOnlyDecorator<T extends StateStore, K, V> extends Wra
super(inner);
}
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- throw new UnsupportedOperationException(ERROR_MESSAGE);
- }
-
@Override
public void put(final K key,
final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
index d077089..86dd33f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractReadWriteDecorator.java
@@ -154,13 +154,6 @@ abstract class AbstractReadWriteDecorator<T extends StateStore, K, V> extends Wr
super(inner);
}
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- wrapped().put(key, value);
- }
-
@Override
public void put(final K key,
final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
index ef1f799..3ac11b8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/WindowStore.java
@@ -36,24 +36,6 @@ import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFail
public interface WindowStore<K, V> extends StateStore, ReadOnlyWindowStore<K, V> {
/**
- * Use the current record timestamp as the {@code windowStartTimestamp} and
- * delegate to {@link WindowStore#put(Object, Object, long)}.
- * <p>
- * It's highly recommended to use {@link WindowStore#put(Object, Object, long)} instead, as the record timestamp
- * is unlikely to be the correct windowStartTimestamp in general.
- *
- * @param key The key to associate the value to
- * @param value The value to update, it can be null;
- * if the serialized bytes are also null it is interpreted as delete
- * @throws NullPointerException if the given key is {@code null}
- * @deprecated as timestamp is not provided for the key-value pair, this causes inconsistency
- * to identify the window frame to which the key belongs.
- * Use {@link #put(Object, Object, long)} instead.
- */
- @Deprecated
- void put(K key, V value);
-
- /**
* Put a key-value pair into the window with given window start timestamp
* <p>
* If serialized value bytes are null it is interpreted as delete. Note that deletes will be
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index c750f3a..cc7e5c6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -142,12 +142,6 @@ class CachingWindowStore
return true;
}
- @Deprecated
- @Override
- public synchronized void put(final Bytes key,
- final byte[] value) {
- put(key, value, context.timestamp());
- }
@Override
public synchronized void put(final Bytes key,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 6a367b1..70c8949 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -131,16 +131,6 @@ class ChangeLoggingWindowBytesStore
return wrapped().backwardFetchAll(timeFrom, timeTo);
}
- @Deprecated
- @Override
- public void put(final Bytes key, final byte[] value) {
- // Note: It's incorrect to bypass the wrapped store here by delegating to another method,
- // but we have no alternative. We must send a timestamped key to the changelog, which means
- // we need to know what timestamp gets used for the record. Hopefully, we can deprecate this
- // method in the future to resolve the situation.
- put(key, value, context.timestamp());
- }
-
@Override
public void put(final Bytes key,
final byte[] value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
index 0a1dc86..5dbfc0d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java
@@ -108,12 +108,6 @@ public class InMemoryWindowStore implements WindowStore<Bytes, byte[]> {
open = true;
}
- @Deprecated
- @Override
- public void put(final Bytes key, final byte[] value) {
- put(key, value, context.timestamp());
- }
-
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
removeExpiredSegments();
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 1b419ec..8c77cf2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -157,13 +157,6 @@ public class MeteredWindowStore<K, V>
return false;
}
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- put(key, value, context != null ? context.timestamp() : 0L);
- }
-
@Override
public void put(final K key,
final V value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
index b3ba652..3949505 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBWindowStore.java
@@ -18,10 +18,6 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.StateStoreContext;
-import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
@@ -33,7 +29,6 @@ public class RocksDBWindowStore
private final boolean retainDuplicates;
private final long windowSize;
- private InternalProcessorContext context;
private int seqnum = 0;
RocksDBWindowStore(final SegmentedBytesStore bytesStore,
@@ -44,25 +39,6 @@ public class RocksDBWindowStore
this.windowSize = windowSize;
}
- @Deprecated
- @Override
- public void init(final ProcessorContext context, final StateStore root) {
- this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
- super.init(context, root);
- }
-
- @Override
- public void init(final StateStoreContext context, final StateStore root) {
- this.context = context instanceof InternalProcessorContext ? (InternalProcessorContext) context : null;
- super.init(context, root);
- }
-
- @Deprecated
- @Override
- public void put(final Bytes key, final byte[] value) {
- put(key, value, context != null ? context.timestamp() : 0L);
- }
-
@Override
public void put(final Bytes key, final byte[] value, final long windowStartTimestamp) {
// Skip if value is null and duplicates are allowed since this delete is a no-op
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
index 075545f..417b45b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreBuilder.java
@@ -122,13 +122,6 @@ public class TimestampedWindowStoreBuilder<K, V>
wrapped.init(context, root);
}
- @Deprecated
- @Override
- public void put(final Bytes key,
- final byte[] value) {
- wrapped.put(key, value);
- }
-
@Override
public void put(final Bytes key,
final byte[] value,
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
index 586e87d..8d895fc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowToTimestampedWindowByteStoreAdapter.java
@@ -40,13 +40,6 @@ class WindowToTimestampedWindowByteStoreAdapter implements WindowStore<Bytes, by
this.store = store;
}
- @Deprecated
- @Override
- public void put(final Bytes key,
- final byte[] valueWithTimestamp) {
- store.put(key, valueWithTimestamp == null ? null : rawValue(valueWithTimestamp));
- }
-
@Override
public void put(final Bytes key,
final byte[] valueWithTimestamp,
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 6ab26e1..5694c52 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -227,7 +227,6 @@ public class ProcessorContextImplTest {
checkThrowsUnsupportedOperation(store::flush, "flush()");
checkThrowsUnsupportedOperation(() -> store.put("1", 1L, 1L), "put()");
- checkThrowsUnsupportedOperation(() -> store.put("1", 1L), "put()");
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -246,7 +245,6 @@ public class ProcessorContextImplTest {
checkThrowsUnsupportedOperation(store::flush, "flush()");
checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L), 1L), "put() [with timestamp]");
- checkThrowsUnsupportedOperation(() -> store.put("1", ValueAndTimestamp.make(1L, 1L)), "put() [no timestamp]");
assertEquals(timestampedIters.get(0), store.fetchAll(0L, 0L));
assertEquals(windowStoreIter, store.fetch(KEY, 0L, 1L));
@@ -335,7 +333,7 @@ public class ProcessorContextImplTest {
store.flush();
assertTrue(flushExecuted);
- store.put("1", 1L);
+ store.put("1", 1L, 1L);
assertTrue(putExecuted);
assertEquals(iters.get(0), store.fetchAll(0L, 0L));
@@ -355,7 +353,7 @@ public class ProcessorContextImplTest {
store.flush();
assertTrue(flushExecuted);
- store.put("1", ValueAndTimestamp.make(1L, 1L));
+ store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
assertTrue(putExecuted);
store.put("1", ValueAndTimestamp.make(1L, 1L), 1L);
@@ -639,7 +637,7 @@ public class ProcessorContextImplTest {
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE);
expect(windowStore.all()).andReturn(iters.get(2));
- windowStore.put(anyString(), anyLong());
+ windowStore.put(anyString(), anyLong(), anyLong());
expectLastCall().andAnswer(() -> {
putExecuted = true;
return null;
@@ -662,7 +660,7 @@ public class ProcessorContextImplTest {
expect(windowStore.fetch(anyString(), anyLong())).andReturn(VALUE_AND_TIMESTAMP);
expect(windowStore.all()).andReturn(timestampedIters.get(2));
- windowStore.put(anyString(), anyObject(ValueAndTimestamp.class));
+ windowStore.put(anyString(), anyObject(ValueAndTimestamp.class), anyLong());
expectLastCall().andAnswer(() -> {
putExecuted = true;
return null;
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
index aa4a412..8d73a2d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
@@ -782,24 +782,22 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testPutSameKeyTimestamp() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
final long startTime = SEGMENT_INTERVAL - 4L;
- setCurrentTime(startTime);
- windowStore.put(0, "zero");
+ windowStore.put(0, "zero", startTime);
assertEquals(
new HashSet<>(Collections.singletonList("zero")),
valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
ofEpochMilli(startTime + WINDOW_SIZE))));
- windowStore.put(0, "zero");
- windowStore.put(0, "zero+");
- windowStore.put(0, "zero++");
+ windowStore.put(0, "zero", startTime);
+ windowStore.put(0, "zero+", startTime);
+ windowStore.put(0, "zero++", startTime);
assertEquals(
new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
@@ -847,7 +845,6 @@ public abstract class AbstractWindowBytesStoreTest {
@Test
public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
- setCurrentTime(0);
windowStore.put(1, "one", 1L);
windowStore.put(1, "two", 2L);
windowStore.put(1, "three", 3L);
@@ -905,18 +902,16 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testDeleteAndUpdate() {
final long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
- windowStore.put(1, "one v2");
+ windowStore.put(1, "one", currentTime);
+ windowStore.put(1, "one v2", currentTime);
WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, currentTime);
assertEquals(new KeyValue<>(currentTime, "one v2"), iterator.next());
- windowStore.put(1, null);
+ windowStore.put(1, null, currentTime);
iterator = windowStore.fetch(1, 0, currentTime);
assertFalse(iterator.hasNext());
}
@@ -927,9 +922,8 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldThrowNullPointerExceptionOnPutNullKey() {
- assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue"));
+ assertThrows(NullPointerException.class, () -> windowStore.put(null, "anyValue", 0L));
}
@Test
@@ -1120,11 +1114,9 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testWindowIteratorPeek() {
final long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
+ windowStore.put(1, "one", currentTime);
final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.fetchAll(0L, currentTime);
@@ -1151,25 +1143,20 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldNotThrowConcurrentModificationException() {
long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
+ windowStore.put(1, "one", currentTime);
currentTime += WINDOW_SIZE * 10;
- setCurrentTime(currentTime);
- windowStore.put(1, "two");
+ windowStore.put(1, "two", currentTime);
final KeyValueIterator<Windowed<Integer>, String> iterator = windowStore.all();
currentTime += WINDOW_SIZE * 10;
- setCurrentTime(currentTime);
- windowStore.put(1, "three");
+ windowStore.put(1, "three", currentTime);
currentTime += WINDOW_SIZE * 10;
- setCurrentTime(currentTime);
- windowStore.put(2, "four");
+ windowStore.put(2, "four", currentTime);
// Iterator should return all records in store and not throw exception b/c some were added after fetch
assertEquals(windowedPair(1, "one", 0), iterator.next());
@@ -1180,25 +1167,21 @@ public abstract class AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testFetchDuplicates() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
windowStore.init((StateStoreContext) context, windowStore);
long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
- windowStore.put(1, "one-2");
+ windowStore.put(1, "one", currentTime);
+ windowStore.put(1, "one-2", currentTime);
currentTime += WINDOW_SIZE * 10;
- setCurrentTime(currentTime);
- windowStore.put(1, "two");
- windowStore.put(1, "two-2");
+ windowStore.put(1, "two", currentTime);
+ windowStore.put(1, "two-2", currentTime);
currentTime += WINDOW_SIZE * 10;
- setCurrentTime(currentTime);
- windowStore.put(1, "three");
- windowStore.put(1, "three-2");
+ windowStore.put(1, "three", currentTime);
+ windowStore.put(1, "three-2", currentTime);
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0, WINDOW_SIZE * 10);
@@ -1210,38 +1193,26 @@ public abstract class AbstractWindowBytesStoreTest {
}
- @SuppressWarnings("deprecation")
private void putFirstBatch(final WindowStore<Integer, String> store,
@SuppressWarnings("SameParameterValue") final long startTime,
final InternalMockProcessorContext context) {
context.setRecordContext(createRecordContext(startTime));
- store.put(0, "zero");
- context.setRecordContext(createRecordContext(startTime + 1L));
- store.put(1, "one");
- context.setRecordContext(createRecordContext(startTime + 2L));
- store.put(2, "two");
- context.setRecordContext(createRecordContext(startTime + 4L));
- store.put(4, "four");
- context.setRecordContext(createRecordContext(startTime + 5L));
- store.put(5, "five");
+ store.put(0, "zero", startTime);
+ store.put(1, "one", startTime + 1L);
+ store.put(2, "two", startTime + 2L);
+ store.put(4, "four", startTime + 4L);
+ store.put(5, "five", startTime + 5L);
}
- @SuppressWarnings("deprecation")
private void putSecondBatch(final WindowStore<Integer, String> store,
@SuppressWarnings("SameParameterValue") final long startTime,
final InternalMockProcessorContext context) {
- context.setRecordContext(createRecordContext(startTime + 3L));
- store.put(2, "two+1");
- context.setRecordContext(createRecordContext(startTime + 4L));
- store.put(2, "two+2");
- context.setRecordContext(createRecordContext(startTime + 5L));
- store.put(2, "two+3");
- context.setRecordContext(createRecordContext(startTime + 6L));
- store.put(2, "two+4");
- context.setRecordContext(createRecordContext(startTime + 7L));
- store.put(2, "two+5");
- context.setRecordContext(createRecordContext(startTime + 8L));
- store.put(2, "two+6");
+ store.put(2, "two+1", startTime + 3L);
+ store.put(2, "two+2", startTime + 4L);
+ store.put(2, "two+3", startTime + 5L);
+ store.put(2, "two+4", startTime + 6L);
+ store.put(2, "two+5", startTime + 7L);
+ store.put(2, "two+6", startTime + 8L);
}
long extractStoreTimestamp(final byte[] binaryKey) {
@@ -1278,10 +1249,6 @@ public abstract class AbstractWindowBytesStoreTest {
return KeyValue.pair(new Windowed<>(key, WindowKeySchema.timeWindowForSize(timestamp, windowSize)), value);
}
- protected void setCurrentTime(final long currentTime) {
- context.setRecordContext(createRecordContext(currentTime));
- }
-
private ProcessorRecordContext createRecordContext(final long time) {
return new ProcessorRecordContext(time, 0, 0, "topic", null);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 66e49c8..735101f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -236,8 +236,8 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldPutFetchFromCache() {
- cachingStore.put(bytesKey("a"), bytesValue("a"));
- cachingStore.put(bytesKey("b"), bytesValue("b"));
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
assertThat(cachingStore.fetch(bytesKey("a"), 10), equalTo(bytesValue("a")));
assertThat(cachingStore.fetch(bytesKey("b"), 10), equalTo(bytesValue("b")));
@@ -275,8 +275,8 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldPutFetchRangeFromCache() {
- cachingStore.put(bytesKey("a"), bytesValue("a"));
- cachingStore.put(bytesKey("b"), bytesValue("b"));
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
cachingStore.fetch(bytesKey("a"), bytesKey("b"), ofEpochMilli(10), ofEpochMilli(10));
@@ -293,16 +293,15 @@ public class CachingPersistentWindowStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldGetAllFromCache() {
- cachingStore.put(bytesKey("a"), bytesValue("a"));
- cachingStore.put(bytesKey("b"), bytesValue("b"));
- cachingStore.put(bytesKey("c"), bytesValue("c"));
- cachingStore.put(bytesKey("d"), bytesValue("d"));
- cachingStore.put(bytesKey("e"), bytesValue("e"));
- cachingStore.put(bytesKey("f"), bytesValue("f"));
- cachingStore.put(bytesKey("g"), bytesValue("g"));
- cachingStore.put(bytesKey("h"), bytesValue("h"));
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.all();
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
@@ -318,14 +317,14 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldGetAllBackwardFromCache() {
- cachingStore.put(bytesKey("a"), bytesValue("a"));
- cachingStore.put(bytesKey("b"), bytesValue("b"));
- cachingStore.put(bytesKey("c"), bytesValue("c"));
- cachingStore.put(bytesKey("d"), bytesValue("d"));
- cachingStore.put(bytesKey("e"), bytesValue("e"));
- cachingStore.put(bytesKey("f"), bytesValue("f"));
- cachingStore.put(bytesKey("g"), bytesValue("g"));
- cachingStore.put(bytesKey("h"), bytesValue("h"));
+ cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator = cachingStore.backwardAll();
final String[] array = {"h", "g", "f", "e", "d", "c", "b", "a"};
@@ -343,8 +342,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldFetchAllWithinTimestampRange() {
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
for (int i = 0; i < array.length; i++) {
- context.setTime(i);
- cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+ cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@@ -386,8 +384,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldFetchAllBackwardWithinTimestampRange() {
final String[] array = {"a", "b", "c", "d", "e", "f", "g", "h"};
for (int i = 0; i < array.length; i++) {
- context.setTime(i);
- cachingStore.put(bytesKey(array[i]), bytesValue(array[i]));
+ cachingStore.put(bytesKey(array[i]), bytesValue(array[i]), i);
}
final KeyValueIterator<Windowed<Bytes>, byte[]> iterator =
@@ -444,7 +441,7 @@ public class CachingPersistentWindowStoreTest {
public void shouldForwardDirtyItemsWhenFlushCalled() {
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
- cachingStore.put(bytesKey("1"), bytesValue("a"));
+ cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("a", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
@@ -462,24 +459,24 @@ public class CachingPersistentWindowStoreTest {
cachingStore.setFlushListener(cacheListener, true);
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
- cachingStore.put(bytesKey("1"), bytesValue("a"));
- cachingStore.put(bytesKey("1"), bytesValue("b"));
+ cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
- cachingStore.put(bytesKey("1"), bytesValue("c"));
+ cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
assertEquals("b", cacheListener.forwarded.get(windowedKey).oldValue);
- cachingStore.put(bytesKey("1"), null);
+ cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey).newValue);
assertEquals("c", cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
- cachingStore.put(bytesKey("1"), bytesValue("a"));
- cachingStore.put(bytesKey("1"), bytesValue("b"));
- cachingStore.put(bytesKey("1"), null);
+ cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey));
cacheListener.forwarded.clear();
@@ -490,23 +487,23 @@ public class CachingPersistentWindowStoreTest {
public void shouldForwardOldValuesWhenDisabled() {
final Windowed<String> windowedKey =
new Windowed<>("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE));
- cachingStore.put(bytesKey("1"), bytesValue("a"));
- cachingStore.put(bytesKey("1"), bytesValue("b"));
+ cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("b", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
- cachingStore.put(bytesKey("1"), bytesValue("c"));
+ cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
cachingStore.flush();
assertEquals("c", cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
- cachingStore.put(bytesKey("1"), null);
+ cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey).newValue);
assertNull(cacheListener.forwarded.get(windowedKey).oldValue);
cacheListener.forwarded.clear();
- cachingStore.put(bytesKey("1"), bytesValue("a"));
- cachingStore.put(bytesKey("1"), bytesValue("b"));
- cachingStore.put(bytesKey("1"), null);
+ cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
+ cachingStore.put(bytesKey("1"), null, DEFAULT_TIMESTAMP);
cachingStore.flush();
assertNull(cacheListener.forwarded.get(windowedKey));
cacheListener.forwarded.clear();
@@ -619,7 +616,7 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldClearNamespaceCacheOnClose() {
- cachingStore.put(bytesKey("a"), bytesValue("a"));
+ cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
assertEquals(1, cache.size());
cachingStore.close();
assertEquals(0, cache.size());
@@ -641,7 +638,7 @@ public class CachingPersistentWindowStoreTest {
@SuppressWarnings("deprecation")
public void shouldThrowIfTryingToWriteToClosedCachingStore() {
cachingStore.close();
- assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a")));
+ assertThrows(InvalidStateStoreException.class, () -> cachingStore.put(bytesKey("a"), bytesValue("a"), 0L));
}
@Test
@@ -789,13 +786,13 @@ public class CachingPersistentWindowStoreTest {
@Test
@SuppressWarnings("deprecation")
public void shouldThrowNullPointerExceptionOnPutNullKey() {
- assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue")));
+ assertThrows(NullPointerException.class, () -> cachingStore.put(null, bytesValue("anyValue"), 0L));
}
@Test
@SuppressWarnings("deprecation")
public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
- cachingStore.put(bytesKey("a"), null);
+ cachingStore.put(bytesKey("a"), null, 0L);
}
@Test
@@ -919,13 +916,12 @@ public class CachingPersistentWindowStoreTest {
bytesValue(value));
}
- @SuppressWarnings("deprecation")
private int addItemsToCache() {
int cachedSize = 0;
int i = 0;
while (cachedSize < MAX_CACHE_SIZE_BYTES) {
final String kv = String.valueOf(i++);
- cachingStore.put(bytesKey(kv), bytesValue(kv));
+ cachingStore.put(bytesKey(kv), bytesValue(kv), DEFAULT_TIMESTAMP);
cachedSize += memoryCacheEntrySize(kv.getBytes(), kv.getBytes(), TOPIC) +
8 + // timestamp
4; // sequenceNumber
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 6608739..50c18fe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -99,7 +99,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
context.logChange(store.name(), key, value, 42);
EasyMock.replay(context);
- store.put(bytesKey, valueAndTimestamp);
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
EasyMock.verify(inner, context);
}
@@ -146,8 +146,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
EasyMock.replay(context);
- store.put(bytesKey, valueAndTimestamp);
- store.put(bytesKey, valueAndTimestamp);
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
EasyMock.verify(inner, context);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index b19b99a..36e3297 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -84,7 +84,6 @@ public class ChangeLoggingWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldLogPuts() {
inner.put(bytesKey, value, 0);
EasyMock.expectLastCall();
@@ -98,7 +97,7 @@ public class ChangeLoggingWindowBytesStoreTest {
context.logChange(store.name(), key, value, 0L);
EasyMock.replay(context);
- store.put(bytesKey, value);
+ store.put(bytesKey, value, context.timestamp());
EasyMock.verify(inner, context);
}
@@ -152,7 +151,6 @@ public class ChangeLoggingWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldRetainDuplicatesWhenSet() {
store = new ChangeLoggingWindowBytesStore(inner, true, WindowKeySchema::toStoreKeyBinary);
@@ -171,8 +169,8 @@ public class ChangeLoggingWindowBytesStoreTest {
EasyMock.replay(context);
- store.put(bytesKey, value);
- store.put(bytesKey, value);
+ store.put(bytesKey, value, context.timestamp());
+ store.put(bytesKey, value, context.timestamp());
EasyMock.verify(inner, context);
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index e5f2142..2ef9bad 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -126,37 +126,30 @@ public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testExpiration() {
long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
+ windowStore.put(1, "one", currentTime);
currentTime += RETENTION_PERIOD / 4;
- setCurrentTime(currentTime);
- windowStore.put(1, "two");
+ windowStore.put(1, "two", currentTime);
currentTime += RETENTION_PERIOD / 4;
- setCurrentTime(currentTime);
- windowStore.put(1, "three");
+ windowStore.put(1, "three", currentTime);
currentTime += RETENTION_PERIOD / 4;
- setCurrentTime(currentTime);
- windowStore.put(1, "four");
+ windowStore.put(1, "four", currentTime);
// increase current time to the full RETENTION_PERIOD to expire first record
currentTime = currentTime + RETENTION_PERIOD / 4;
- setCurrentTime(currentTime);
- windowStore.put(1, "five");
+ windowStore.put(1, "five", currentTime);
KeyValueIterator<Windowed<Integer>, String> iterator = windowStore
.fetchAll(0L, currentTime);
// effect of this put (expires next oldest record, adds new one) should not be reflected in the already fetched results
currentTime = currentTime + RETENTION_PERIOD / 4;
- setCurrentTime(currentTime);
- windowStore.put(1, "six");
+ windowStore.put(1, "six", currentTime);
// should only have middle 4 values, as (only) the first record was expired at the time of the fetch
// and the last was inserted after the fetch
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
index 595cb01..c05c1ba 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.java
@@ -202,7 +202,6 @@ public class MeteredTimestampedWindowStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
EasyMock.replay(innerStoreMock);
@@ -217,7 +216,7 @@ public class MeteredTimestampedWindowStoreTest {
store.init((StateStoreContext) context, innerStoreMock);
try {
- store.put("key", ValueAndTimestamp.make(42L, 60000));
+ store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
} catch (final StreamsException exception) {
if (exception.getCause() instanceof ClassCastException) {
fail("Serdes are not correctly set from processor context.");
@@ -227,7 +226,6 @@ public class MeteredTimestampedWindowStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
EasyMock.expect(innerStoreMock.name()).andStubReturn("mocked-store");
EasyMock.replay(innerStoreMock);
@@ -242,7 +240,7 @@ public class MeteredTimestampedWindowStoreTest {
store.init((StateStoreContext) context, innerStoreMock);
try {
- store.put("key", ValueAndTimestamp.make(42L, 60000));
+ store.put("key", ValueAndTimestamp.make(42L, 60000), 60000L);
} catch (final StreamsException exception) {
if (exception.getCause() instanceof ClassCastException) {
fail("Serdes are not correctly set from constructor parameters.");
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index a74aaea..cb3a268 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -290,7 +290,6 @@ public class MeteredWindowStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldRecordPutLatency() {
final byte[] bytes = "a".getBytes();
innerStoreMock.put(eq(Bytes.wrap(bytes)), anyObject(), eq(context.timestamp()));
@@ -298,7 +297,7 @@ public class MeteredWindowStoreTest {
replay(innerStoreMock);
store.init((StateStoreContext) context, store);
- store.put("a", "a");
+ store.put("a", "a", context.timestamp());
final Map<MetricName, ? extends Metric> metrics = context.metrics().metrics();
if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
assertEquals(1.0, getMetricByNameFilterByTags(
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
index 9e9b0b4..ce85893 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java
@@ -81,26 +81,21 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void shouldOnlyIterateOpenSegments() {
long currentTime = 0;
- setCurrentTime(currentTime);
- windowStore.put(1, "one");
+ windowStore.put(1, "one", currentTime);
currentTime = currentTime + SEGMENT_INTERVAL;
- setCurrentTime(currentTime);
- windowStore.put(1, "two");
+ windowStore.put(1, "two", currentTime);
currentTime = currentTime + SEGMENT_INTERVAL;
- setCurrentTime(currentTime);
- windowStore.put(1, "three");
+ windowStore.put(1, "three", currentTime);
final WindowStoreIterator<String> iterator = windowStore.fetch(1, 0L, currentTime);
// roll to the next segment that will close the first
currentTime = currentTime + SEGMENT_INTERVAL;
- setCurrentTime(currentTime);
- windowStore.put(1, "four");
+ windowStore.put(1, "four", currentTime);
// should only have 2 values as the first segment is no longer open
assertEquals(new KeyValue<>(SEGMENT_INTERVAL, "two"), iterator.next());
@@ -109,22 +104,18 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testRolling() {
// to validate segments
final long startTime = SEGMENT_INTERVAL * 2;
final long increment = SEGMENT_INTERVAL / 2;
- setCurrentTime(startTime);
- windowStore.put(0, "zero");
+ windowStore.put(0, "zero", startTime);
assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
- setCurrentTime(startTime + increment);
- windowStore.put(1, "one");
+ windowStore.put(1, "one", startTime + increment);
assertEquals(Utils.mkSet(segments.segmentName(2)), segmentDirs(baseDir));
- setCurrentTime(startTime + increment * 2);
- windowStore.put(2, "two");
+ windowStore.put(2, "two", startTime + increment * 2);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@@ -133,8 +124,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(startTime + increment * 4);
- windowStore.put(4, "four");
+ windowStore.put(4, "four", startTime + increment * 4);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@@ -144,8 +134,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(startTime + increment * 5);
- windowStore.put(5, "five");
+ windowStore.put(5, "five", startTime + increment * 5);
assertEquals(
Utils.mkSet(
segments.segmentName(2),
@@ -192,8 +181,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 5 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 5 + WINDOW_SIZE))));
- setCurrentTime(startTime + increment * 6);
- windowStore.put(6, "six");
+ windowStore.put(6, "six", startTime + increment * 6);
assertEquals(
Utils.mkSet(
segments.segmentName(3),
@@ -246,8 +234,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 6 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 6 + WINDOW_SIZE))));
- setCurrentTime(startTime + increment * 7);
- windowStore.put(7, "seven");
+ windowStore.put(7, "seven", startTime + increment * 7);
assertEquals(
Utils.mkSet(
segments.segmentName(3),
@@ -306,8 +293,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
ofEpochMilli(startTime + increment * 7 - WINDOW_SIZE),
ofEpochMilli(startTime + increment * 7 + WINDOW_SIZE))));
- setCurrentTime(startTime + increment * 8);
- windowStore.put(8, "eight");
+ windowStore.put(8, "eight", startTime + increment * 8);
assertEquals(
Utils.mkSet(
segments.segmentName(4),
@@ -385,7 +371,6 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testSegmentMaintenance() {
windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(),
@@ -393,23 +378,20 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
windowStore.init((StateStoreContext) context, windowStore);
context.setTime(0L);
- setCurrentTime(0);
- windowStore.put(0, "v");
+ windowStore.put(0, "v", 0);
assertEquals(
Utils.mkSet(segments.segmentName(0L)),
segmentDirs(baseDir)
);
- setCurrentTime(SEGMENT_INTERVAL - 1);
- windowStore.put(0, "v");
- windowStore.put(0, "v");
+ windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
+ windowStore.put(0, "v", SEGMENT_INTERVAL - 1);
assertEquals(
Utils.mkSet(segments.segmentName(0L)),
segmentDirs(baseDir)
);
- setCurrentTime(SEGMENT_INTERVAL);
- windowStore.put(0, "v");
+ windowStore.put(0, "v", SEGMENT_INTERVAL);
assertEquals(
Utils.mkSet(segments.segmentName(0L), segments.segmentName(1L)),
segmentDirs(baseDir)
@@ -431,8 +413,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(SEGMENT_INTERVAL * 3);
- windowStore.put(0, "v");
+ windowStore.put(0, "v", SEGMENT_INTERVAL * 3);
iter = windowStore.fetch(0, ofEpochMilli(0L), ofEpochMilli(SEGMENT_INTERVAL * 4));
fetchedCount = 0;
@@ -447,8 +428,7 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
segmentDirs(baseDir)
);
- setCurrentTime(SEGMENT_INTERVAL * 5);
- windowStore.put(0, "v");
+ windowStore.put(0, "v", SEGMENT_INTERVAL * 5);
iter = windowStore.fetch(0, ofEpochMilli(SEGMENT_INTERVAL * 4), ofEpochMilli(SEGMENT_INTERVAL * 10));
fetchedCount = 0;
@@ -512,29 +492,19 @@ public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
}
@Test
- @SuppressWarnings("deprecation")
public void testRestore() throws Exception {
final long startTime = SEGMENT_INTERVAL * 2;
final long increment = SEGMENT_INTERVAL / 2;
- setCurrentTime(startTime);
- windowStore.put(0, "zero");
- setCurrentTime(startTime + increment);
- windowStore.put(1, "one");
- setCurrentTime(startTime + increment * 2);
- windowStore.put(2, "two");
- setCurrentTime(startTime + increment * 3);
- windowStore.put(3, "three");
- setCurrentTime(startTime + increment * 4);
- windowStore.put(4, "four");
- setCurrentTime(startTime + increment * 5);
- windowStore.put(5, "five");
- setCurrentTime(startTime + increment * 6);
- windowStore.put(6, "six");
- setCurrentTime(startTime + increment * 7);
- windowStore.put(7, "seven");
- setCurrentTime(startTime + increment * 8);
- windowStore.put(8, "eight");
+ windowStore.put(0, "zero", startTime);
+ windowStore.put(1, "one", startTime + increment);
+ windowStore.put(2, "two", startTime + increment * 2);
+ windowStore.put(3, "three", startTime + increment * 3);
+ windowStore.put(4, "four", startTime + increment * 4);
+ windowStore.put(5, "five", startTime + increment * 5);
+ windowStore.put(6, "six", startTime + increment * 6);
+ windowStore.put(7, "seven", startTime + increment * 7);
+ windowStore.put(8, "eight", startTime + increment * 8);
windowStore.flush();
windowStore.close();
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 659fb47..f657209 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -1360,13 +1360,6 @@ public class TopologyTestDriver implements Closeable {
inner.init(context, root);
}
- @Deprecated
- @Override
- public void put(final K key,
- final V value) {
- inner.put(key, ValueAndTimestamp.make(value, ConsumerRecord.NO_TIMESTAMP));
- }
-
@Override
public void put(final K key,
final V value,
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
index d184f98..434782c 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/WindowStoreFacadeTest.java
@@ -71,17 +71,6 @@ public class WindowStoreFacadeTest {
}
@Test
- @SuppressWarnings("deprecation")
- public void shouldPutWithUnknownTimestamp() {
- mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP));
- expectLastCall();
- replay(mockedWindowTimestampStore);
-
- windowStoreFacade.put("key", "value");
- verify(mockedWindowTimestampStore);
- }
-
- @Test
public void shouldPutWindowStartTimestampWithUnknownTimestamp() {
mockedWindowTimestampStore.put("key", ValueAndTimestamp.make("value", ConsumerRecord.NO_TIMESTAMP), 21L);
expectLastCall();