You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2018/01/11 17:54:12 UTC
[kafka] branch trunk updated: KAFKA-6265: Add #queryableStoreName()
to GlobalKTable
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8aa176 KAFKA-6265: Add #queryableStoreName() to GlobalKTable
b8aa176 is described below
commit b8aa1761c39bd320950104fa2ddf4fbd7163fc85
Author: Richard Yu <yo...@gmail.com>
AuthorDate: Thu Jan 11 09:54:02 2018 -0800
KAFKA-6265: Add #queryableStoreName() to GlobalKTable
A spinoff of original pull request #4340 for resolving conflicts.
Author: RichardYuSTUG <yo...@gmail.com>
Reviewers: Matthias J. Sax <ma...@confluent.io>, Guozhang Wang <wa...@gmail.com>
Closes #4413 from ConcurrencyPractitioner/kafka-6265-2
---
docs/streams/upgrade-guide.html | 7 ++++++
.../apache/kafka/streams/kstream/GlobalKTable.java | 6 +++++
.../kstream/internals/GlobalKTableImpl.java | 16 +++++++++++++
.../kstream/internals/InternalStreamsBuilder.java | 2 +-
.../internals/InternalStreamsBuilderTest.java | 26 ++++++++++++++++++++++
5 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 4303811..c68b9bf 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -72,6 +72,13 @@
</p>
<p>
+ New method in <code>GlobalKTable</code>
+ </p>
+ <ul>
+ <li> A method has been provided such that it will return the store name associated with the <code>GlobalKTable</code> or <code>null</code> if the store name is non-queryable. </li>
+ </ul>
+
+ <p>
New methods in <code>KafkaStreams</code>:
</p>
<ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
index 72286c2..e58f67f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/GlobalKTable.java
@@ -67,4 +67,10 @@ import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
*/
@InterfaceStability.Evolving
public interface GlobalKTable<K, V> {
+ /**
+ * Get the name of the local state store that can be used to query this {@code GlobalKTable}.
+ *
+ * @return the underlying state store name, or {@code null} if this {@code GlobalKTable} cannot be queried.
+ */
+ String queryableStoreName();
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
index 34e2375..8fcdfed 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GlobalKTableImpl.java
@@ -21,13 +21,29 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
public class GlobalKTableImpl<K, V> implements GlobalKTable<K, V> {
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
+ private final boolean queryable;
public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier) {
this.valueGetterSupplier = valueGetterSupplier;
+ this.queryable = true;
+ }
+
+ public GlobalKTableImpl(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
+ final boolean queryable) {
+ this.valueGetterSupplier = valueGetterSupplier;
+ this.queryable = queryable;
}
KTableValueGetterSupplier<K, V> valueGetterSupplier() {
return valueGetterSupplier;
}
+ @Override
+ public String queryableStoreName() {
+ if (!queryable) {
+ return null;
+ }
+ return valueGetterSupplier.storeNames()[0];
+ }
+
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 4308e5d..2a8a89e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -158,7 +158,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
topic,
processorName,
tableSource);
- return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()));
+ return new GlobalKTableImpl<>(new KTableSourceValueGetterSupplier<K, V>(storeBuilder.name()), materialized.isQueryable());
}
@Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 156acad..b9ba608 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -156,6 +156,32 @@ public class InternalStreamsBuilderTest {
assertEquals("topic2", topology.storeToChangelogTopic().get(storeName));
assertNull(table1.queryableStoreName());
}
+
+ @Test
+ public void shouldBuildGlobalTableWithNonQueryableStoreName() throws Exception {
+ final GlobalKTable<String, String> table1 = builder.globalTable(
+ "topic2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(null, null),
+ builder,
+ storePrefix));
+
+ assertNull(table1.queryableStoreName());
+ }
+
+ @Test
+ public void shouldBuildGlobalTableWithQueryaIbleStoreName() throws Exception {
+ final GlobalKTable<String, String> table1 = builder.globalTable(
+ "topic2",
+ consumed,
+ new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+ builder,
+ storePrefix));
+
+ assertEquals("globalTable", table1.queryableStoreName());
+ }
@Test
public void shouldBuildSimpleGlobalTableTopology() throws Exception {
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].