You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/11 17:54:12 UTC

[kafka] branch trunk updated: KAFKA-6265: Add #queryableStoreName() to GlobalKTable

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8aa176  KAFKA-6265: Add #queryableStoreName() to GlobalKTable
b8aa176 is described below

commit b8aa1761c39bd320950104fa2ddf4fbd7163fc85
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Thu Jan 11 09:54:02 2018 -0800

    KAFKA-6265: Add #queryableStoreName() to GlobalKTable
    
    A spinoff of original pull request #4340 for resolving conflicts.
    
    Author: RichardYuSTUG <yo...@gmail.com>
    
    Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
    
    Closes #4413 from ConcurrencyPractitioner/kafka-6265-2
---
 docs/streams/upgrade-guide.html                    |  7 ++++++
 .../apache/kafka/streams/kstream/GlobalKTable.java |  6 +++++
 .../kstream/internals/GlobalKTableImpl.java        | 16 +++++++++++++
 .../kstream/internals/InternalStreamsBuilder.java  |  2 +-
 .../internals/InternalStreamsBuilderTest.java      | 26 ++++++++++++++++++++++
 5 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 4303811..c68b9bf 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -72,6 +72,13 @@
     </p>
 
     <p>
+	New method in <code>GlobalKTable</code>
+    </p>
+    <ul>
+	<li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
+    </ul>
+
+    <p>
         New methods in <code>KafkaStreams</code>:
     </p>
     <ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 72286c2..e58f67f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -67,4 +67,10 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
  */
 @InterfaceStability.Evolving
 public interface GlobalKTable<K, V> {
+    /**
+     * Get the name of the local state store that can be used to query this {@code GlobalKTable}.
+     *
+     * @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried.
+     */
+    String queryableStoreName();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 34e2375..8fcdfed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,13 +21,29 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
 
     private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
+    private final boolean queryable;
 
     public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier) {
         this.valueGetterSupplier = valueGetterSupplier;
+        this.queryable = true;
+    }
+    
+    public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier, 
+                            final boolean queryable) {
+        this.valueGetterSupplier = valueGetterSupplier;
+        this.queryable = queryable;
     }
 
     KTableValueGetterSupplier<K, V> valueGetterSupplier() {
         return valueGetterSupplier;
     }
 
+    @Override
+    public String queryableStoreName() {
+        if (!queryable) {
+            return null;
+        }
+        return valueGetterSupplier.storeNames()[0];
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d..2a8a89e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -158,7 +158,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                                                topic,
                                                processorName,
                                                tableSource);
-        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
+        return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()), materialized.isQueryable());
     }
 
     @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 156acad..b9ba608 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -156,6 +156,32 @@ public class InternalStreamsBuilderTest {
         assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
         assertNull(table1.queryableStoreName());
     }
+    
+    @Test
+    public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+        final GlobalKTable<String, String> table1 = builder.globalTable(
+            "topic2",
+            consumed,
+            new MaterializedInternal<>(
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
+                builder,
+                storePrefix));
+
+        assertNull(table1.queryableStoreName());
+    }
+
+    @Test
+    public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+        final GlobalKTable<String, String> table1 = builder.globalTable(
+            "topic2",
+            consumed,
+            new MaterializedInternal<>(
+                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+                builder,
+                storePrefix));
+
+        assertEquals("globalTable", table1.queryableStoreName());
+    }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].