You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by pm...@apache.org on 2019/05/28 19:41:56 UTC

[samza] branch master updated: SAMZA-2222: Documentation for async API

This is an automated email from the ASF dual-hosted git repository.

pmaheshwari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 22f16da  SAMZA-2222: Documentation for async API
22f16da is described below

commit 22f16da04b082a05273a354c59a9364632d74826
Author: mynameborat <bh...@gmail.com>
AuthorDate: Tue May 28 12:41:51 2019 -0700

    SAMZA-2222: Documentation for async API
    
    ![image](https://user-images.githubusercontent.com/46942335/58350496-e8b5f880-7e1a-11e9-8736-16f968146109.png)
    
    Author: mynameborat <bh...@gmail.com>
    
    Reviewers: Prateek Maheshwari <pm...@apache.org>
    
    Closes #1052 from mynameborat/async-documentation
---
 .../documentation/versioned/api/high-level-api.md  | 21 ++++++
 .../versioned/jobs/samza-configurations.md         |  1 -
 .../tutorials/versioned/samza-async-user-guide.md  | 84 +++++++++++++++++++++-
 3 files changed, 103 insertions(+), 3 deletions(-)

diff --git a/docs/learn/documentation/versioned/api/high-level-api.md b/docs/learn/documentation/versioned/api/high-level-api.md
index 43fd727..53cac8c 100644
--- a/docs/learn/documentation/versioned/api/high-level-api.md
+++ b/docs/learn/documentation/versioned/api/high-level-api.md
@@ -29,6 +29,7 @@ title: High Level Streams API
 - [Operators](#operators)
   - [Map](#map)
   - [FlatMap](#flatmap)
+  - [AsyncFlatMap](#asyncflatmap)
   - [Filter](#filter)
   - [PartitionBy](#partitionby)
   - [Merge](#merge)
@@ -142,6 +143,26 @@ Applies the provided 1:n [FlatMapFunction](javadocs/org/apache/samza/operators/f
 
 {% endhighlight %}
 
+#### AsyncFlatMap
+Applies the provided 1:n [AsyncFlatMapFunction](javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction) to each element in the MessageStream and returns the transformed MessageStream. The AsyncFlatMapFunction takes in a single message and returns a future of zero or more messages.
+
+{% highlight java %}
+
+    RestClient restClient = ...
+    MessageStream<String> words = ...
+    // Transform each incoming word into its meaning using a dictionary look up service
+    MessageStream<String> meanings = words.asynFlatMap(word -> {
+       // Builds a look up request to the dictionary service
+       Request<String> dictionaryRequest = buildDictionaryRequest(word);
+       CompletableFuture<DictionaryResponse> dictionaryResponseFuture = restClient.sendRequest(dictionaryRequest);
+       return dictionaryResponseFuture
+            .thenApply(response -> new Pair<>(word, response.getMeaning()));
+    });
+
+{% endhighlight %}
+
+For more details on asynchronous processing, see [Samza Async API and Multithreading User Guide](../../../tutorials/{{site.version}}/samza-async-user-guide)
+
 #### Filter
 Applies the provided [FilterFunction](javadocs/org/apache/samza/operators/functions/FilterFunction) to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.
 
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 4fcc87b..1e702b0 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -95,7 +95,6 @@ These are the basic properties for setting up a Samza application.
 |task.opts| |Any JVM options to include in the command line when executing Samza containers. For example, this can be used to set the JVM heap size, to tune the garbage collector, or to enable remote debugging. This cannot be used when running with ThreadJobFactory. Anything you put in task.opts gets forwarded directly to the commandline as part of the JVM invocation.<br>Example: `task.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC`|
 |task.poll.interval.ms|50|Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining buffered messages to process for any input SystemStreamPartition. The second condition arises when some input SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which mean [...]
 |task.shutdown.ms|30000|This property controls how long the Samza container will wait for an orderly shutdown of task instances.|
-|job.container.single.<br>thread.mode|false|_(Deprecated)_ If set to true, samza will fallback to legacy single-threaded event loop. Default is false, which enables the [multithreading execution](../container/event-loop.html).|
 
 ### <a name="checkpointing"></a> [2. Checkpointing](#checkpointing)
 [Checkpointing](../container/checkpointing.html) is not required, but recommended for most jobs. If you don't configure checkpointing, and a job or container restarts, it does not remember which messages it has already processed. Without checkpointing, consumer behavior on startup is determined by the ...samza.offset.default setting. Checkpointing allows a job to start up where it previously left off.
diff --git a/docs/learn/tutorials/versioned/samza-async-user-guide.md b/docs/learn/tutorials/versioned/samza-async-user-guide.md
index 3e3314c..70c9204 100644
--- a/docs/learn/tutorials/versioned/samza-async-user-guide.md
+++ b/docs/learn/tutorials/versioned/samza-async-user-guide.md
@@ -107,16 +107,96 @@ task.callback.timeout.ms=5000
 
 **NOTE:** Samza also guarantees the in-order process of the messages within an AsyncStreamTask by default, meaning the next processAsync() of a task won't be called until the previous processAsync() callback has been triggered.
 
+### Asynchronous Process in High Level API
+
+If your processing logic is asynchronous, e.g. it makes non-blocking remote calls, you can implement it using the [AsyncFlatMapFunction](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/operators/functions/AsyncFlatMapFunction.html). The following example illustrates an application that processes Wikipedia feed updates and invokes a remote service to standardize the updates and sends the standardized events to Wikipedia.
+
+{% highlight java %}
+
+public class WikipediaAsyncStandardizer implements StreamApplication {
+
+  @Override
+  public void describe(StreamApplicationDescriptor appDescriptor) {
+    // Define a SystemDescriptor for Wikipedia data
+    WikipediaSystemDescriptor wikipediaSystemDescriptor = new WikipediaSystemDescriptor("irc.wikimedia.org", 6667);
+    // Define InputDescriptors for consuming wikipedia data
+    WikipediaInputDescriptor wikipediaInputDescriptor = wikipediaSystemDescriptor
+        .getInputDescriptor("en-wikipedia")
+        .withChannel("#en.wikipedia");
+    // Define OutputDescriptor for producing wikipedia data
+    WikipediaOutputDescriptor wikipediaOutputDescriptor = wikipediaSystemDescriptor
+        .getOutputDescriptor("en-wikipedia-standardized")
+        .withChannel("#en.wikipedia.standardized");
+
+    appDescriptor.getInputStream(wikipediaInputDescriptor)
+        .filter(WikipediaFeedEvent::isUpdate)
+        .flatMapAsync(new AsyncStandardizerFunction())
+        .sendTo(wikipediaOutputDescriptor);
+  }
+
+  static class AsyncStandardizerFunction implements AsyncFlatMapFunction<WikipediaFeedEvent, StandardizedWikipediaFeedEvent> {
+    private transient Client client;
+
+    @Override
+    public void init(Context context) {
+      client = ClientBuilder.newClient(context.getJobContext().getConfig().get("standardizer.uri"));
+    }
+
+    @Override
+    public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent wikipediaFeedEvent) {
+      Request<StandardizerRequest> standardizerRequest = buildStandardizedRequest(wikipediaFeedEvent);
+      CompletableFuture<StandardizerResponse> standardizerResponse = client.sendRequest(standardizerRequest);
+
+      return standardizerResponse
+          .thenApply(response -> extractStandardizedWikipediaFeedEvent(response));
+    }
+
+    @Override
+    public void close() {
+      client.close();
+    }
+  }
+}
+{% endhighlight %}
+
+In the above example, the results from the `AsyncStandardizerFunction` are propagated to downstream operator once the future is complete. There is an overall timeout for each to message to be processed and you can tune it using:
+
+{% highlight jproperties %}
+# Timeout for the message to processed. When the timeout elapses, the container shuts down.
+task.callback.timeout.ms
+{% endhighlight %}
+
+If IO library accepts callbacks instead of returning a Future, the callback can be adapted to a Future in the following way:
+
+{% highlight java %}
+
+  public CompletionStage<Collection<StandardizedWikipediaFeedEvent>> apply(WikipediaFeedEvent wikipediaFeedEvent) {
+    Request<StandardizerRequest> standardizationRequest = buildStandardizedRequest(wikipediaFeedEvent);
+    CompletableFuture<Collection<StandardizedWikipediaFeedEvent>> standardizedFuture = new CompletableFuture<>();
+    client.async().get(standardizationRequest, new InvocationCallback<Response>() {
+          @Override
+          public void completed(ResStandardizerResponseponse response) {
+            standardizedFuture.complete(extractStandardizedWikipediaFeedEvent(response));
+          }
+
+          @Override
+          public void failed(Throwable throwable) {
+            standardizedFuture.completeExceptionally(throwable);
+          }
+        });
+  }
+{% endhighlight %}
+
 ### Out-of-order Process
 
-In both cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time: 
+In all cases above, Samza supports in-order process by default. Further parallelism is also supported by allowing a task to process multiple outstanding messages in parallel. The following config allows one task to process at most 4 outstanding messages in parallel at a time:
 
 {% highlight jproperties %}
 # Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
 task.max.concurrency=4
 {% endhighlight %}
 
-**NOTE:** In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can go out of order. In case of StreamTask with multithreading, process() can run out-of-order since they are dispatched to a thread pool. This option should **NOT** be used when strict ordering of the output is required.
+**NOTE:** In case of AsyncStreamTask, processAsync() is still invoked in the order of the message arrivals, but the completion can happen out of order. In case of StreamTask and High level API applications with task.max.concurrency &gt; 1, delivery can be out-of-order. This option should **NOT** be used when strict ordering of the output is required.
 
 ### Guaranteed Semantics