You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/10/04 19:42:09 UTC

kafka git commit: MINOR: JavaDoc improvements for new state store API

Repository: kafka
Updated Branches:
  refs/heads/trunk 93b71e7de -> 06738b41b


MINOR: JavaDoc improvements for new state store API

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Michael G. Noll <mi...@confluent.io>, Bill Bejeck <bi...@confluent.io>, Guozhang Wang <wa...@gmail.com>

Closes #4006 from mjsax/minor-javadoc-improvments-for-stores


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/06738b41
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/06738b41
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/06738b41

Branch: refs/heads/trunk
Commit: 06738b41bc2447211d901d09aade7aadd0df6254
Parents: 93b71e7
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Oct 4 12:42:07 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Oct 4 12:42:07 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/Consumed.java | 27 +++++++++++++---
 .../java/org/apache/kafka/streams/Topology.java |  4 ++-
 .../kafka/streams/kstream/Materialized.java     | 20 ++++++++++--
 .../org/apache/kafka/streams/state/Stores.java  | 34 ++++++++++++++++++++
 4 files changed, 77 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/06738b41/streams/src/main/java/org/apache/kafka/streams/Consumed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
index 4a936e1..78d6b24 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Consumed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
@@ -24,10 +24,30 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 
 /**
  * The {@code Consumed} class is used to define the optional parameters when using {@link StreamsBuilder} to
- * build instancs of {@link KStream}, {@link KTable}, and {@link GlobalKTable}.
+ * build instances of {@link KStream}, {@link KTable}, and {@link GlobalKTable}.
+ * <p>
+ * For example, you can read a topic as {@link KStream} with a custom timestamp extractor and specify the corresponding
+ * key and value serdes like:
+ * <pre>{@code
+ * StreamsBuilder builder = new StreamsBuilder();
+ * KStream<String, Long> stream = builder.stream(
+ *   "topicName",
+ *   Consumed.with(Serdes.String(), Serdes.Long())
+ *           .withTimestampExtractor(new LogAndSkipOnInvalidTimestamp()));
+ * }</pre>
+ * Similarly, you can read a topic as {@link KTable} with a custom {@code auto.offset.reset} configuration and force a
+ * state store {@link org.apache.kafka.streams.kstream.Materialized materialization} to access the content via
+ * interactive queries:
+ * <pre>{@code
+ * StreamsBuilder builder = new StreamsBuilder();
+ * KTable<Integer, Integer> table = builder.table(
+ *   "topicName",
+ *   Consumed.with(AutoOffsetReset.LATEST),
+ *   Materialized.as("queryable-store-name"));
+ * }</pre>
  *
- * @param <K>
- * @param <V>
+ * @param <K> type of record key
+ * @param <V> type of record value
  */
 public class Consumed<K, V> {
 
@@ -40,7 +60,6 @@ public class Consumed<K, V> {
                      final Serde<V> valueSerde,
                      final TimestampExtractor timestampExtractor,
                      final Topology.AutoOffsetReset resetPolicy) {
-
         this.keySerde = keySerde;
         this.valueSerde = valueSerde;
         this.timestampExtractor = timestampExtractor;

http://git-wip-us.apache.org/repos/asf/kafka/blob/06738b41/streams/src/main/java/org/apache/kafka/streams/Topology.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 85d769f..1409b97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -54,7 +54,9 @@ public class Topology {
     final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder();
 
     /**
-     * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable}
+     * Sets the {@code auto.offset.reset} configuration when
+     * {@link #addSource(AutoOffsetReset, String, String...) adding a source processor} or when creating {@link KStream}
+     * or {@link KTable} via {@link StreamsBuilder}.
      */
     public enum AutoOffsetReset {
         EARLIEST, LATEST

http://git-wip-us.apache.org/repos/asf/kafka/blob/06738b41/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
index 1f142c2..dd7165c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Materialized.java
@@ -34,9 +34,23 @@ import java.util.Objects;
 
 /**
  * Used to describe how a {@link StateStore} should be materialized.
- * You can either provide a custom {@link StateStore} backend
- * through one of the provided methods accepting a supplier or use the default RocksDB backends
- * by providing just a store name.
+ * You can either provide a custom {@link StateStore} backend through one of the provided methods accepting a supplier
+ * or use the default RocksDB backends by providing just a store name.
+ * <p>
+ * For example, you can read a topic as {@link KTable} and force a state store materialization to access the content
+ * via Interactive Queries API:
+ * <pre>{@code
+ * StreamsBuilder builder = new StreamsBuilder();
+ * KTable<Integer, Integer> table = builder.table(
+ *   "topicName",
+ *   Materialized.as("queryable-store-name"));
+ * }</pre>
+ *
+ * @param <K> type of record key
+ * @param <V> type of record value
+ * @param <S> type of state store (note: state stores always have key/value types {@code <Bytes,byte[]>}
+ *
+ * @see org.apache.kafka.streams.state.Stores
  */
 public class Materialized<K, V, S extends StateStore> {
     protected StoreSupplier<S> storeSupplier;

http://git-wip-us.apache.org/repos/asf/kafka/blob/06738b41/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 05ebd33..0173c1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -44,6 +44,40 @@ import java.util.Map;
 
 /**
  * Factory for creating state stores in Kafka Streams.
+ * <p>
+ * When using the high-level DSL, i.e., {@link org.apache.kafka.streams.StreamsBuilder StreamsBuilder}, users create
+ * {@link StoreSupplier}s that can be further customized via
+ * {@link org.apache.kafka.streams.kstream.Materialized Materialized}.
+ * For example, a topic read as {@link org.apache.kafka.streams.kstream.KTable KTable} can be materialized into an
+ * in-memory store with custom key/value serdes and caching disabled:
+ * <pre>{@code
+ * StreamsBuilder builder = new StreamsBuilder();
+ * KeyValueBytesStoreSupplier storeSupplier = Stores.inMemoryKeyValueStore("queryable-store-name");
+ * KTable<Long,String> table = builder.table(
+ *   "topicName",
+ *   Materialized.as(storeSupplier)
+ *               .withKeySerde(Serdes.Long())
+ *               .withValueSerde(Serdes.String())
+ *               .withCachingDisabled());
+ * }</pre>
+ * When using the Processor API, i.e., {@link org.apache.kafka.streams.Topology Topology}, users create
+ * {@link StoreBuilder}s that can be attached to {@link org.apache.kafka.streams.processor.Processor Processor}s.
+ * For example, you can create a {@link org.apache.kafka.streams.kstream.Windowed windowed} RocksDB store with custom
+ * changelog topic configuration like:
+ * <pre>{@code
+ * Topology topology = new Topology();
+ * topology.addProcessor("processorName", ...);
+ *
+ * Map<String,String> topicConfig = new HashMap<>();
+ * StoreBuilder<WindowStore<Integer, Long>> storeBuilder = Stores
+ *   .windowStoreBuilder(
+ *     Stores.persistentWindowStore("queryable-store-name", ...),
+ *     Serdes.Integer(),
+ *     Serdes.Long())
+ *   .withLoggingEnabled(topicConfig);
+ *
+ * topology.addStateStore(storeBuilder, "processorName");
+ * }</pre>
  */
 @InterfaceStability.Evolving
 public class Stores {