You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/10 17:28:09 UTC

[GitHub] [kafka] jeqo opened a new pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

jeqo opened a new pull request #10293:
URL: https://github.com/apache/kafka/pull/10293


   Following changes from https://cwiki.apache.org/confluence/display/KAFKA/KIP-667%3A+Remove+deprecated+methods+from+ReadOnlyWindowStore for the upcoming v3.0, this PR proposes to remove WindowStore#put deprecated methods.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#discussion_r592620290



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -710,73 +710,9 @@ public void testPutAndFetchAfter() {
         assertNull(entriesByKey.get(6));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testPutSameKeyTimestamp() {

Review comment:
       My mistake, I thought it was repeated. Will add it back.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
jeqo commented on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-796958801


   Totally missed that this was proposed and accepted in KIP-474. Hope is not stepping into an existing implementation. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo edited a comment on pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
jeqo edited a comment on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-796958801


   ~~Totally missed that this was proposed and accepted in KIP-474. Hope is not stepping into an existing implementation.~~
   
   Just realized the KIP is just about deprecating this method, and this is just completing it with the removal 😅. Turned the sub-task into a task in JIRA and linked it to the right KIP.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#discussion_r591938958



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -710,73 +710,9 @@ public void testPutAndFetchAfter() {
         assertNull(entriesByKey.get(6));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testPutSameKeyTimestamp() {

Review comment:
       Why removing the whole test?

##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -710,73 +710,9 @@ public void testPutAndFetchAfter() {
         assertNull(entriesByKey.get(6));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testPutSameKeyTimestamp() {
-        windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init((StateStoreContext) context, windowStore);
-
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        setCurrentTime(startTime);
-        windowStore.put(0, "zero");
-
-        assertEquals(
-            new HashSet<>(Collections.singletonList("zero")),
-            valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
-
-        windowStore.put(0, "zero");
-        windowStore.put(0, "zero+");
-        windowStore.put(0, "zero++");
-
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 1L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 2L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 3L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 4L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
-
-        // Flush the store and verify all current entries were properly flushed ...
-        windowStore.flush();
-
-        final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-        for (final ProducerRecord<Object, Object> record : recordCollector.collected()) {
-            changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
-        }
-
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-        assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
-    }
-
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
-        setCurrentTime(0);
+        context.setRecordContext(createRecordContext(0));

Review comment:
       Do we need to set the record context?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-816888464


   Thanks @jeqo! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #10293: KAFKA-12449: Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #10293:
URL: https://github.com/apache/kafka/pull/10293


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#issuecomment-796276922


   Seem this PR is related to KIP-474 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=115526545.
   
   We should rather tag the KIPs that did the deprecation of methods (on KIP-667 that is unnecessary)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jeqo commented on a change in pull request #10293: [KAFKA-12449] Remove deprecated WindowStore#put

Posted by GitBox <gi...@apache.org>.
jeqo commented on a change in pull request #10293:
URL: https://github.com/apache/kafka/pull/10293#discussion_r592620749



##########
File path: streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractWindowBytesStoreTest.java
##########
@@ -710,73 +710,9 @@ public void testPutAndFetchAfter() {
         assertNull(entriesByKey.get(6));
     }
 
-    @Test
-    @SuppressWarnings("deprecation")
-    public void testPutSameKeyTimestamp() {
-        windowStore = buildWindowStore(RETENTION_PERIOD, WINDOW_SIZE, true, Serdes.Integer(), Serdes.String());
-        windowStore.init((StateStoreContext) context, windowStore);
-
-        final long startTime = SEGMENT_INTERVAL - 4L;
-
-        setCurrentTime(startTime);
-        windowStore.put(0, "zero");
-
-        assertEquals(
-            new HashSet<>(Collections.singletonList("zero")),
-            valuesToSet(windowStore.fetch(0, ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
-
-        windowStore.put(0, "zero");
-        windowStore.put(0, "zero+");
-        windowStore.put(0, "zero++");
-
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime - WINDOW_SIZE),
-                ofEpochMilli(startTime + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 1L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 1L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 2L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 2L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(asList("zero", "zero", "zero+", "zero++")),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 3L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 3L + WINDOW_SIZE))));
-        assertEquals(
-            new HashSet<>(Collections.emptyList()),
-            valuesToSet(windowStore.fetch(
-                0,
-                ofEpochMilli(startTime + 4L - WINDOW_SIZE),
-                ofEpochMilli(startTime + 4L + WINDOW_SIZE))));
-
-        // Flush the store and verify all current entries were properly flushed ...
-        windowStore.flush();
-
-        final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>();
-        for (final ProducerRecord<Object, Object> record : recordCollector.collected()) {
-            changeLog.add(new KeyValue<>(((Bytes) record.key()).get(), (byte[]) record.value()));
-        }
-
-        final Map<Integer, Set<String>> entriesByKey = entriesByKey(changeLog, startTime);
-
-        assertEquals(Utils.mkSet("zero@0", "zero@0", "zero+@0", "zero++@0"), entriesByKey.get(0));
-    }
-
     @Test
     public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
-        setCurrentTime(0);
+        context.setRecordContext(createRecordContext(0));

Review comment:
       Seems that we don't :) will remove this line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org