You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2019/11/08 22:38:21 UTC

[samza] branch master updated: SAMZA-1949: Add java docs and configuration documentation for side inputs (#1064)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bd9e9d8  SAMZA-1949: Add java docs and configuration documentation for side inputs (#1064)
bd9e9d8 is described below

commit bd9e9d8ca1d64f9545e92bf2b40e998df0fd5751
Author: mynameborat <bh...@apache.org>
AuthorDate: Fri Nov 8 14:38:11 2019 -0800

    SAMZA-1949: Add java docs and configuration documentation for side inputs (#1064)
    
    Side inputs feature and configuration documentation
---
 .../documentation/versioned/api/low-level-api.md   | 25 ++++++++
 .../versioned/api/programming-model.md             |  3 +-
 .../learn/documentation/versioned/api/table-api.md | 66 +++++++++++++++++++++-
 .../versioned/jobs/samza-configurations.md         |  2 +
 .../table/descriptors/LocalTableDescriptor.java    | 17 ++++++
 5 files changed, 110 insertions(+), 3 deletions(-)

diff --git a/docs/learn/documentation/versioned/api/low-level-api.md b/docs/learn/documentation/versioned/api/low-level-api.md
index e3466af..6d345c1 100644
--- a/docs/learn/documentation/versioned/api/low-level-api.md
+++ b/docs/learn/documentation/versioned/api/low-level-api.md
@@ -274,6 +274,24 @@ For example:
 
 {% endhighlight %}
 
+### Side Inputs for Local Tables
+
+For populating a local [Table](javadocs/org/apache/samza/table/Table) with secondary data sources, we can use side inputs to specify the source stream. Additionally, the table descriptor also takes
+in a `SideInputsProcessor` that will be applied before writing the entries to the table. The `TableDescriptor` that is registered with the `TaskApplicationDescriptor` can be used to specify side input properties.
+
+The following code snippet shows a sample `TableDescriptor` for a local table that is backed by side inputs.
+
+{% highlight java %}
+
+    RocksDbTableDescriptor<String, Profile> tableDesc = 
+      new RocksDbTableDescriptor(“viewCounts”, KVSerde.of(new StringSerde(), new JsonSerdeV2<>(Profile.class)))
+        .withSideInput(ImmutableList.of("profile"))
+        .withSideInputsProcessor((message, store) -> {
+          ...
+        });
+
+{% endhighlight %} 
+
 ### Legacy Applications
 
 For legacy Low Level API applications, you can continue specifying your system, stream and store properties along with your task.class in configuration. An incomplete example of configuration for legacy task application looks like this (see the [configuration](../jobs/configuration.html) documentation for more detail):
@@ -296,4 +314,11 @@ For legacy Low Level API applications, you can continue specifying your system,
     # Use the "json" serializer for messages in the "PageViewEvent" topic
     systems.kafka.streams.PageViewEvent.samza.msg.serde=json
     
+    # Use "ProfileEvent" from the "kafka" system for side inputs for "profile-store"
+    stores.profile-store.side.inputs=kafka.ProfileEvent
+    
+    # Use "MySideInputsProcessorFactory" to instantiate the "SideInputsProcessor" 
+    # that will applied on the "ProfileEvent" before writing to "profile-store"
+    stores.profile-store.side.inputs.processor.factory=org.apache.samza.MySideInputsProcessorFactory
+    
 {% endhighlight %}
\ No newline at end of file
diff --git a/docs/learn/documentation/versioned/api/programming-model.md b/docs/learn/documentation/versioned/api/programming-model.md
index 943c573..8c93011 100644
--- a/docs/learn/documentation/versioned/api/programming-model.md
+++ b/docs/learn/documentation/versioned/api/programming-model.md
@@ -71,8 +71,7 @@ Descriptors let you specify the properties of various aspects of your applicatio
 [InputDescriptor](javadocs/org/apache/samza/system/descriptors/InputDescriptor.html)s and [OutputDescriptor](javadocs/org/apache/samza/system/descriptors/OutputDescriptor.html)s can be used for specifying Samza and implementation-specific properties of the streaming inputs and outputs for your application. You can obtain InputDescriptors and OutputDescriptors using a [SystemDescriptor](javadocs/org/apache/samza/system/descriptors/SystemDescriptor.html) for your system. This SystemDescrip [...]
 can use the [GenericSystemDescriptor](javadocs/org/apache/samza/system/descriptors/GenericSystemDescriptor.html).
 
-A [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html) can be used for specifying Samza and implementation-specific properties of a [Table](javadocs/org/apache/samza/table/Table.html). You can use a Local TableDescriptor (e.g. [RocksDbTableDescriptor](javadocs/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.html) or a [RemoteTableDescriptor](javadocs/org/apache/samza/table/descriptors/RemoteTableDescriptor).
-
+A [TableDescriptor](javadocs/org/apache/samza/table/descriptors/TableDescriptor.html) can be used for specifying Samza and implementation-specific properties of a [Table](javadocs/org/apache/samza/table/Table.html). You can use a Local TableDescriptor (e.g. [RocksDbTableDescriptor](javadocs/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.html) or a [RemoteTableDescriptor](javadocs/org/apache/samza/table/descriptors/RemoteTableDescriptor).    
 
 The following example illustrates how you can use input and output descriptors for a Kafka system, and a table descriptor for a local RocksDB table within your application:
 
diff --git a/docs/learn/documentation/versioned/api/table-api.md b/docs/learn/documentation/versioned/api/table-api.md
index 106d27a..3511233 100644
--- a/docs/learn/documentation/versioned/api/table-api.md
+++ b/docs/learn/documentation/versioned/api/table-api.md
@@ -180,6 +180,66 @@ join with a table and finally write the output to another table.
    function defined in lines 30-39.
 6. Line 12: writes the join result stream to another table
 
+# Using Table with Samza High Level API using Side Inputs
+
+The code snippet below illustrates the usage of table in Samza high level API using side inputs.
+
+{% highlight java %}
+
+ 1  class SamzaStreamApplication implements StreamApplication {
+ 2    @Override
+ 3    public void describe(StreamApplicationDescriptor appDesc) {
+ 4      TableDescriptor<Integer, Profile> desc = new InMemoryTableDescriptor(
+ 5          "t1", KVSerde.of(new IntegerSerde(), new ProfileJsonSerde()))
+ 6          .withSideInputs(ImmutableList.of(PROFILE_STREAM))
+ 7          .withSideInputsProcessor((msg, store) -> {
+ 8              Profile profile = (Profile) msg.getMessage();
+ 9              int key = profile.getMemberId();
+10              return ImmutableList.of(new Entry<>(key, profile));
+11            });
+12 
+13      Table<KV<Integer, Profile>> table = appDesc.getTable(desc);
+14 
+15      appDesc.getInputStream("PageView", new NoOpSerde<PageView>())
+16          .map(new MyMapFunc())
+17          .join(table, new MyJoinFunc())
+18          .sendTo(anotherTable);
+19    }
+21  }
+22
+23  static class MyMapFunc implements MapFunction<PageView, KV<Integer, PageView>> {
+24    private ReadableTable<Integer, Profile> profileTable;
+25
+26    @Override
+27    public void init(Config config, TaskContext context) {
+28      profileTable = (ReadableTable<Integer, Profile>) context.getTable("t1");
+29    }
+30 
+31    @Override
+32    public KV<Integer, PageView> apply(PageView message) {
+33      return new KV.of(message.getId(), message);
+34    }
+35  }
+36
+37  static class MyJoinFunc implements StreamTableJoinFunction
+38      <Integer, KV<Integer, PageView>, KV<Integer, Profile>, EnrichedPageView> {
+39
+40    @Override
+41    public EnrichedPageView apply(KV<Integer, PageView> m, KV<Integer, Profile> r) {
+42      counterPerJoinFn.get(this.currentSeqNo).incrementAndGet();
+43        return r == null ? null : new EnrichedPageView(
+44            m.getValue().getPageKey(), m.getKey(), r.getValue().getCompany());
+45    }
+46  }
+
+{% endhighlight %}
+
+The code above uses side inputs to populate the profile table. 
+1. Line 6: Denotes the source stream for the profile table
+2. Line 7-11: Provides an implementation of `SideInputsProcessor` that reads from profile stream
+     and populates the table.
+3. Line 17: Incoming page views are joined against the profile table.
+
 # Using Table with Samza Low Level API
 
 The code snippet below illustrates the usage of table in Samza Low Level Task API.
@@ -443,7 +503,11 @@ on the current implementation of in-memory and RocksDB stores. Both tables provi
 feature parity to existing in-memory and RocksDB-based stores. For more detailed 
 information please refer to 
 [`RocksDbTableDescriptor`] (https://github.com/apache/samza/blob/master/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbTableDescriptor.java) and 
-[`InMemoryTableDescriptor`] (https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java). 
+[`InMemoryTableDescriptor`] (https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java).
+
+For local tables that are populated by secondary data sources, side inputs can be used to populate the data.
+The source streams will be used to bootstrap the data instead of a changelog in the event of failure. Side inputs and 
+the processor implementation can be provided as properties to the `TableDescriptor`.
 
 ## Hybrid Table
 
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 8637a4a..1a2e04b 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -280,6 +280,8 @@ These properties define Samza's storage mechanism for efficient [stateful stream
 |stores.**_store-name_**.<br>rocksdb.metrics.list|(none)|A list of [RocksDB properties](https://github.com/facebook/rocksdb/blob/master/include/rocksdb/db.h#L409) to expose as metrics (gauges).|
 |stores.**_store-name_**.<br>rocksdb.delete.obsolete.files.period.micros|21600000000|This property specifies the period in microseconds to delete obsolete files regardless of files removed during compaction. Allowed range is up to 9223372036854775807.|
 |stores.**_store-name_**.<br>rocksdb.max.manifest.file.size|18446744073709551615|This property specifies the maximum size of the MANIFEST data file, after which it is rotated. Default value is also the maximum, making it practically unlimited: only one manifest file is used.|
+|stores.**_store-name_**.<br>side.inputs|(none)|Samza applications with stores that are populated by a secondary data sources such as HDFS, but otherwise ready-only, can leverage side inputs. Stores configured with side inputs use the the source streams to bootstrap data in the absence of local copy thereby, reducing additional copy of the data in changelog. It is also recommended to enable host affinity feature when turning on side inputs to prevent bootstrapping of the data during cont [...]
+|stores.**_store-name_**.<br>side.inputs.processor.factory|(none)|The value is a fully-qualified name of a Java class that implements <a href="../api/javadocs/org/apache/samza/storage/SideInputProcessorFactory.html">SideInputProcessorFactory</a>. It is a required configuration for stores with side inputs (`stores.store-name.side.inputs`).
 
 ### <a name="deployment"></a>[5. Deployment](#deployment)
 Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deployment models](../deployment/deployment-model.html). Below are the configurations options for both models.
diff --git a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
index 1623710..8fff2b5 100644
--- a/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
+++ b/samza-api/src/main/java/org/apache/samza/table/descriptors/LocalTableDescriptor.java
@@ -67,6 +67,16 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
     this.serde = serde;
   }
 
+  /**
+   * Add side inputs to the table. Each stream is of the format
+   * <i>system-name</i>.<i>stream-name</i>. The streams are marked as bootstrap streams and once the table is bootstrapped, it is
+   * updated in the background in change capture mode.
+   * Applications should specify the transformation logic using {@link #withSideInputsProcessor(SideInputsProcessor)}, which is
+   * will be applied to the incoming messages and the results are written to the table.
+   *
+   * @param sideInputs list of side input streams
+   * @return this table descriptor instance
+   */
   public D withSideInputs(List<String> sideInputs) {
     this.sideInputs = sideInputs;
     // Disable changelog
@@ -76,6 +86,13 @@ abstract public class LocalTableDescriptor<K, V, D extends LocalTableDescriptor<
     return (D) this;
   }
 
+  /**
+   * Provide the {@link SideInputsProcessor} for this table. It is applied on the side inputs and the results are
+   * written to the table.
+   *
+   * @param sideInputsProcessor a side input processor
+   * @return this table descriptor instance
+   */
   public D withSideInputsProcessor(SideInputsProcessor sideInputsProcessor) {
     this.sideInputsProcessor = sideInputsProcessor;
     return (D) this;