You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/03 05:01:00 UTC

[jira] [Commented] (KAFKA-7101) Session Window store should set topic policy `compact,cleanup`

    [ https://issues.apache.org/jira/browse/KAFKA-7101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530827#comment-16530827 ] 

ASF GitHub Bot commented on KAFKA-7101:
---------------------------------------

guozhangwang closed pull request #5298: KAFKA-7101: Consider session store for windowed store default configs
URL: https://github.com/apache/kafka/pull/5298
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index ed51754d978..c644f9bbf38 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -138,8 +139,10 @@ public StateStore build() {
         }
 
         long retentionPeriod() {
-            if (isWindowStore()) {
+            if (builder instanceof WindowStoreBuilder) {
                 return ((WindowStoreBuilder) builder).retentionPeriod();
+            } else if (builder instanceof SessionStoreBuilder) {
+                return ((SessionStoreBuilder) builder).retentionPeriod();
             } else {
                 throw new IllegalStateException("retentionPeriod is not supported when not a window store");
             }
@@ -159,7 +162,7 @@ private String name() {
         }
 
         private boolean isWindowStore() {
-            return builder instanceof WindowStoreBuilder;
+            return builder instanceof WindowStoreBuilder || builder instanceof SessionStoreBuilder;
         }
 
         // Apparently Java strips the generics from this method because we're using the raw type for builder,
@@ -226,7 +229,7 @@ private SourceNodeFactory(final String name,
                                   final Deserializer<?> keyDeserializer,
                                   final Deserializer<?> valDeserializer) {
             super(name, NO_PREDECESSORS);
-            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>();
+            this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<>();
             this.pattern = pattern;
             this.keyDeserializer = keyDeserializer;
             this.valDeserializer = valDeserializer;
@@ -316,7 +319,7 @@ public ProcessorNode build() {
                 final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName;
                 if (internalTopicNames.contains(topic)) {
                     // prefix the internal topic name with the application id
-                    return new SinkNode<>(name, new StaticTopicNameExtractor<K, V>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
+                    return new SinkNode<>(name, new StaticTopicNameExtractor<>(decorateTopic(topic)), keySerializer, valSerializer, partitioner);
                 } else {
                     return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner);
                 }
@@ -415,7 +418,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset,
             throw new TopologyException("Sink " + name + " must have at least one parent");
         }
 
-        addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
+        addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames);
         nodeToSinkTopic.put(name, topic);
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
index 04b0ceb1ecf..69540899ada 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/SessionBytesStoreSupplier.java
@@ -35,4 +35,11 @@
      * @return segmentInterval in milliseconds
      */
     long segmentIntervalMs();
+
+    /**
+     * The time period for which the {@link SessionStore} will retain historic data.
+     *
+     * @return retentionPeriod
+     */
+    long retentionPeriod();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 45df39c7ce2..5610fb2b619 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -56,4 +56,9 @@ public long segmentIntervalMs() {
         // Selected somewhat arbitrarily. Profiling may reveal a different value is preferable.
         return Math.max(retentionPeriod / 2, 60_000L);
     }
+
+    @Override
+    public long retentionPeriod() {
+        return retentionPeriod;
+    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
index 61919c39123..b4338952a66 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionStoreBuilder.java
@@ -60,4 +60,8 @@ public SessionStoreBuilder(final SessionBytesStoreSupplier storeSupplier,
         }
         return new ChangeLoggingSessionBytesStore(inner);
     }
+
+    public long retentionPeriod() {
+        return storeSupplier.retentionPeriod();
+    }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index fb641302560..78c217d236e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -532,24 +532,36 @@ public void shouldAddInternalTopicConfigForWindowStores() {
         builder.setApplicationId("appId");
         builder.addSource(null, "source", null, null, null, "topic");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source");
-
         builder.addStateStore(
             Stores.windowStoreBuilder(
-                Stores.persistentWindowStore("store", 30_000L, 10_000L, false),
+                Stores.persistentWindowStore("store1", 30_000L, 10_000L, false),
                 Serdes.String(),
                 Serdes.String()
             ),
             "processor"
         );
+        builder.addStateStore(
+                Stores.sessionStoreBuilder(
+                        Stores.persistentSessionStore("store2", 30000), Serdes.String(), Serdes.String()
+                ),
+                "processor"
+        );
         final Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups();
         final InternalTopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next();
-        final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog");
-        final Map<String, String> properties = topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(2, properties.size());
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
-        assertEquals("40000", properties.get(TopicConfig.RETENTION_MS_CONFIG));
-        assertEquals("appId-store-changelog", topicConfig.name());
-        assertTrue(topicConfig instanceof WindowedChangelogTopicConfig);
+        final InternalTopicConfig topicConfig1 = topicsInfo.stateChangelogTopics.get("appId-store1-changelog");
+        final Map<String, String> properties1 = topicConfig1.getProperties(Collections.<String, String>emptyMap(), 10000);
+        assertEquals(2, properties1.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties1.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("40000", properties1.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store1-changelog", topicConfig1.name());
+        assertTrue(topicConfig1 instanceof WindowedChangelogTopicConfig);
+        final InternalTopicConfig topicConfig2 = topicsInfo.stateChangelogTopics.get("appId-store2-changelog");
+        final Map<String, String> properties2 = topicConfig2.getProperties(Collections.<String, String>emptyMap(), 10000);
+        assertEquals(2, properties2.size());
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT + "," + TopicConfig.CLEANUP_POLICY_DELETE, properties2.get(TopicConfig.CLEANUP_POLICY_CONFIG));
+        assertEquals("40000", properties2.get(TopicConfig.RETENTION_MS_CONFIG));
+        assertEquals("appId-store2-changelog", topicConfig2.name());
+        assertTrue(topicConfig2 instanceof WindowedChangelogTopicConfig);
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Session Window store should set topic policy `compact,cleanup`
> --------------------------------------------------------------
>
>                 Key: KAFKA-7101
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7101
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 0.10.2.2, 2.0.0, 0.11.0.3, 1.0.2, 1.1.1
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Major
>             Fix For: 2.1.0
>
>
> With [KIP-71|https://cwiki.apache.org/confluence/display/KAFKA/KIP-71%3A+Enable+log+compaction+and+deletion+to+co-exist] (0.10.1.0) topic config `compact,delete` was introduce to apply to windowed store changelog topics in Kafka Streams. Later (0.10.2.0), session windows got added in [KIP-94|https://cwiki.apache.org/confluence/display/KAFKA/KIP-94+Session+Windows]. However, session windows do not use `compact,delete` at the moment. This result is the same issue window stores face before KIP-71. Thus, we should enable `compact,delete` for session window changelog topics, too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)