You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by gi...@apache.org on 2018/10/16 15:59:10 UTC

[beam] branch asf-site updated: Publishing website 2018/10/16 15:59:06 at commit 3553620

This is an automated email from the ASF dual-hosted git repository.

git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 39088c8  Publishing website 2018/10/16 15:59:06 at commit 3553620
39088c8 is described below

commit 39088c8854fbfcc1c3bb16e87addf8f5aded3d28
Author: jenkins <bu...@apache.org>
AuthorDate: Tue Oct 16 15:59:07 2018 +0000

    Publishing website 2018/10/16 15:59:06 at commit 3553620
---
 .../documentation/sdks/java/euphoria/index.html    | 620 +++++++++++++++++++--
 1 file changed, 578 insertions(+), 42 deletions(-)

diff --git a/website/generated-content/documentation/sdks/java/euphoria/index.html b/website/generated-content/documentation/sdks/java/euphoria/index.html
index 493f44a..679536e6 100644
--- a/website/generated-content/documentation/sdks/java/euphoria/index.html
+++ b/website/generated-content/documentation/sdks/java/euphoria/index.html
@@ -224,8 +224,43 @@
 
 <ul class="nav">
   <li><a href="#what-is-euphoria">What is Euphoria</a></li>
-  <li><a href="#how-to-build">How to build</a></li>
-  <li><a href="#wordcount-example">WordCount example</a></li>
+  <li><a href="#wordcount-example">WordCount Example</a></li>
+  <li><a href="#euphoria-guide">Euphoria Guide</a>
+    <ul>
+      <li><a href="#datasets">Datasets</a></li>
+      <li><a href="#inputs-and-outputs">Inputs and Outputs</a></li>
+      <li><a href="#adding-operators">Adding Operators</a></li>
+      <li><a href="#coders-and-types">Coders and Types</a></li>
+      <li><a href="#metrics-and-accumulators">Metrics and Accumulators</a></li>
+      <li><a href="#windowing">Windowing</a></li>
+      <li><a href="#integration-of-euphoria-into-existing-pipelines">Integration of Euphoria into existing pipelines</a></li>
+    </ul>
+  </li>
+  <li><a href="#how-to-get-euphoria">How to get Euphoria</a></li>
+  <li><a href="#operator-reference">Operator Reference</a>
+    <ul>
+      <li><a href="#countbykey"><code class="highlighter-rouge">CountByKey</code></a></li>
+      <li><a href="#distinct"><code class="highlighter-rouge">Distinct</code></a></li>
+      <li><a href="#join"><code class="highlighter-rouge">Join</code></a></li>
+      <li><a href="#leftjoin"><code class="highlighter-rouge">LeftJoin</code></a></li>
+      <li><a href="#rightjoin"><code class="highlighter-rouge">RightJoin</code></a></li>
+      <li><a href="#fulljoin"><code class="highlighter-rouge">FullJoin</code></a></li>
+      <li><a href="#mapelements"><code class="highlighter-rouge">MapElements</code></a></li>
+      <li><a href="#flatmap"><code class="highlighter-rouge">FlatMap</code></a></li>
+      <li><a href="#filter"><code class="highlighter-rouge">Filter</code></a></li>
+      <li><a href="#reducebykey"><code class="highlighter-rouge">ReduceByKey</code></a></li>
+      <li><a href="#reducewindow"><code class="highlighter-rouge">ReduceWindow</code></a></li>
+      <li><a href="#sumbykey"><code class="highlighter-rouge">SumByKey</code></a></li>
+      <li><a href="#union"><code class="highlighter-rouge">Union</code></a></li>
+      <li><a href="#topperkey"><code class="highlighter-rouge">TopPerKey</code></a></li>
+      <li><a href="#assigneventtime"><code class="highlighter-rouge">AssignEventTime</code></a></li>
+    </ul>
+  </li>
+  <li><a href="#euphoria-to-beam-translation-advanced-user-section">Euphoria To Beam Translation (advanced user section)</a>
+    <ul>
+      <li><a href="#unsupported-features">Unsupported Features</a></li>
+    </ul>
+  </li>
 </ul>
 
 
@@ -246,73 +281,574 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <h1 id="euphoria-java-8-dsl">Euphoria Java 8 DSL</h1>
+<!--
+NOTE for future maintainer.
+There is [`DocumentationExamplesTest`](/documentation/sdks/javadoc/2.7.0/index.html?org/apache/beam/sdk/extensions/euphoria/core/docs/DocumentationExamplesTest.html) in `beam-sdks-java-extensions-euphoria-core` project where all code examples are validated. Do not change the code examples without reflecting it in the `DocumentationExamplesTest` and vice versa.
 
-<h2 id="what-is-euphoria">What is Euphoria</h2>
-
-<p>Easy to use Java 8 DSL for the Beam Java SDK. Provides a high-level abstraction of Beam transformations, which is both easy to read and write. Can be used as a complement to existing Beam pipelines (convertible back and forth).</p>
-
-<p>Integration of Euphoria API to Beam is in <strong>progress</strong> (<a href="https://issues.apache.org/jira/browse/BEAM-3900">BEAM-3900</a>).</p>
+Following operator is unsupported. Include it in documentation when supported.
 
-<h2 id="how-to-build">How to build</h2>
+Lower level transformations (if possible user should prefer above transformations):
+### `ReduceStateByKey`: assigns each input item to a set of windows and turns the item into a key/value pair.
+For each of the assigned windows the extracted value is accumulated using a user provided `StateFactory` state
+ implementation under the extracted key. I.e. the value is accumulated into a state identified by
+ a key/window pair.
+-->
 
-<p>Euphoria is located in <code class="highlighter-rouge">dsl-euphoria</code> branch. To build <code class="highlighter-rouge">euphoria</code> subprojects use command:</p>
+<h2 id="what-is-euphoria">What is Euphoria</h2>
+<p>Easy to use Java 8 API build on top of the Beam’s Java SDK. API provides a <a href="#operator-reference">high-level abstraction</a> of data transformations, with focus on the Java 8 language features (e.g. lambdas and streams). It is fully inter-operable with existing Beam SDK and convertible back and forth. It allows fast prototyping through use of (optional) <a href="https://github.com/EsotericSoftware/kryo">Kryo</a> based coders, lambdas and high level operators and can be <a href= [...]
 
-<div class="highlighter-rouge"><pre class="highlight"><code>./gradlew :beam-sdks-java-extensions-euphoria-beam:build 
-</code></pre>
-</div>
+<p><a href="https://github.com/seznam/euphoria">Euphoria API</a> project has been started in 2014, with a clear goal of providing the main building block for <a href="https://www.seznam.cz/">Seznam.cz’s</a> data infrastructure.
+In 2015, <a href="http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf">DataFlow whitepaper</a> inspired original authors to go one step further and also provide the unified API for both stream and batch processing.
+The API has been open-sourced in 2016 and is still in active development. As the Beam’s community goal was very similar, we decided to contribute
+the API as a high level DSL over Beam Java SDK and share our effort with the community.</p>
 
-<h2 id="wordcount-example">WordCount example</h2>
+<p>Euphoria DSL integration is still work in progress and is tracked as part of <a href="https://issues.apache.org/jira/browse/BEAM-3900">BEAM-3900</a>.</p>
 
-<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Pipeline</span> <span class="n">pipeline</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span>
+<h2 id="wordcount-example">WordCount Example</h2>
+<p>Lets start with the small example.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
+<span class="n">Pipeline</span> <span class="n">pipeline</span> <span class="o">=</span> <span class="n">Pipeline</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">options</span><span class="o">);</span>
 
-<span class="c1">// Transform to euphoria's flow.</span>
-<span class="n">BeamFlow</span> <span class="n">flow</span> <span class="o">=</span> <span class="n">BeamFlow</span><span class="o">.</span><span class="na">create</span><span class="o">(</span><span class="n">pipeline</span><span class="o">);</span>
+<span class="c1">// Use Kryo as coder fallback</span>
+<span class="n">KryoCoderProvider</span><span class="o">.</span><span class="na">of</span><span class="o">().</span><span class="na">registerTo</span><span class="o">(</span><span class="n">pipeline</span><span class="o">);</span>
 
 <span class="c1">// Source of data loaded from Beam IO.</span>
 <span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span>
-    <span class="n">pipeline</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Create</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">inputs</span><span class="o">)).</span><span class="na">setTypeDescriptor</span><span class="o">(</span><span class="n">TypeDescriptor</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">String</span><span class="o">. [...]
+    <span class="n">pipeline</span>
+        <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Create</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">textLineByLine</span><span class="o">))</span>
+        <span class="o">.</span><span class="na">setTypeDescriptor</span><span class="o">(</span><span class="n">TypeDescriptor</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
+
 <span class="c1">// Transform PCollection to euphoria's Dataset.</span>
-<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">lines</span> <span class="o">=</span> <span class="n">flow</span><span class="o">.</span><span class="na">wrapped</span><span class="o">(</span><span class="n">input</span><span class="o">);</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">lines</span> <span class="o">=</span>  <span class="n">Dataset</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">);</span>
 
 <span class="c1">// FlatMap processes one input element at a time and allows user code to emit</span>
 <span class="c1">// zero, one, or more output elements. From input lines we will get data set of words.</span>
-<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">FlatMap</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"TOKENIZER"</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">lines</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">using</span><span class="o">((</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">context</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
-      <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">line</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">"\\s+"</span><span class="o">))</span> <span class="o">{</span>
-        <span class="n">context</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
-      <span class="o">}</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span>
+    <span class="n">FlatMap</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"TOKENIZER"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">lines</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+            <span class="o">(</span><span class="n">String</span> <span class="n">line</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">context</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+              <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">Splitter</span><span class="o">.</span><span class="na">onPattern</span><span class="o">(</span><span class="s">"\\s+"</span><span class="o">).</span><span class="na">split</span><span class="o">(</span><span class="n">line</span><span class="o">))</span> <span class="o">{</span>
+                <span class="n">context</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">word</span><span class="o">);</span>
+              <span class="o">}</span>
+            <span class="o">})</span>
+        <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+
+<span class="c1">// Now we can count input words - the operator ensures that all values for the same</span>
+<span class="c1">// key (word in this case) end up being processed together. Then it counts number of appearances</span>
+<span class="c1">// of the same key in 'words' dataset and emits it to output.</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">counted</span> <span class="o">=</span>
+    <span class="n">CountByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"COUNT"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">words</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">w</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+
+<span class="c1">// Format output.</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span>
+    <span class="n">MapElements</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"FORMAT"</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">counted</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="n">p</span> <span class="o">-&gt;</span> <span class="n">p</span><span class="o">.</span><span class="na">getKey</span><span class="o">()</span> <span class="o">+</span> <span class="s">": "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="na">getValue</span><span class="o">())</span>
+        <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+
+<span class="c1">// Transform Dataset back to PCollection. It can be done anytime.</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">outputCollection</span> <span class="o">=</span> <span class="n">output</span><span class="o">.</span><span class="na">getPCollection</span><span class="o">();</span>
+
+<span class="c1">// Now we can again use Beam transformation. In this case we save words and their count</span>
+<span class="c1">// into the text file.</span>
+<span class="n">outputCollection</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">write</span><span class="o">()</span>
+    <span class="o">.</span><span class="na">to</span><span class="o">(</span><span class="s">"counted_words"</span><span class="o">));</span>
+
+<span class="n">pipeline</span><span class="o">.</span><span class="na">run</span><span class="o">();</span>
+</code></pre>
+</div>
+
+<h2 id="euphoria-guide">Euphoria Guide</h2>
+
+<p>Euphoria API is composed from a set of operators, which allows you to construct <code class="highlighter-rouge">Pipeline</code> according to your application needs.</p>
+
+<h3 id="datasets">Datasets</h3>
+<p>Euphoria uses the concept of ‘Datasets’ to describe data pipeline between <code class="highlighter-rouge">Operators</code>. This concept is similar to Beam’s <code class="highlighter-rouge">PCollection</code> and can be converted back and forth through:</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">someCollection</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="c1">// PCollection -&gt; Dataset</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">dataset</span> <span class="o">=</span> <span class="n">Dataset</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">someCollection</span><span class="o">);</span>
+
+<span class="c1">//And now back: Dataset -&gt; PCollection</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">T</span><span class="o">&gt;</span> <span class="n">collection</span> <span class="o">=</span> <span class="n">dataset</span><span class="o">.</span><span class="na">getPCollection</span><span class="o">();</span>
+</code></pre>
+</div>
+
+<h3 id="inputs-and-outputs">Inputs and Outputs</h3>
+<p>Input data can be supplied through Beams IO into <code class="highlighter-rouge">PCollection</code>, the same way as in Beam, and wrapped by <code class="highlighter-rouge">Dataset.of(PCollection&lt;T&gt; pCollection)</code> into <code class="highlighter-rouge">Dataset</code> later on.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span>
+  <span class="n">pipeline</span>
+    <span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">Create</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="s">"mouse"</span><span class="o">,</span> <span class="s">"rat"</span><span class="o">,</span> <span class="s">"elephant"</span><span class="o">,</span> <span class="s">"cat"</span><span class="o">,</span> <span class="s">"X"</span><span class="o">,</span> <span class="s">"duck"</span><span cla [...]
+    <span class="o">.</span><span class="na">setTypeDescriptor</span><span class="o">(</span><span class="n">TypeDescriptor</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
+
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dataset</span> <span class="o">=</span>  <span class="n">Dataset</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">);</span>
+</code></pre>
+</div>
+<p>Outputs can be treated the same way as inputs, last <code class="highlighter-rouge">Dataset</code> is converted to <code class="highlighter-rouge">PCollection</code> and dumped into appropriate IO.</p>
+
+<h3 id="adding-operators">Adding Operators</h3>
+<p>Real power of Euphoria API is in its <a href="#operator-reference">operators suite</a>. Once we get our hands on <code class="highlighter-rouge">Dataset</code> we are able to create and connect operators. Each Operator consumes one or more input and produces one output
+<code class="highlighter-rouge">Dataset</code>. Lets take a look at simple <code class="highlighter-rouge">MapElements</code> example.</p>
+
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mappedElements</span> <span class="o">=</span>
+  <span class="n">MapElements</span>
+    <span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"Int2Str"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">valueOf</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+</code></pre>
+</div>
+<p>The operator consumes <code class="highlighter-rouge">input</code>, it applies given lambda expression (<code class="highlighter-rouge">String::valueOf</code>) on each element of <code class="highlighter-rouge">input</code> and returns mapped <code class="highlighter-rouge">Dataset</code>. Developer is guided through series of steps when creating operator so the declaration of an operator is straightforward. To start building operator just wrote its name and ‘.’ (dot). Your IDE will g [...]
+
+<p>First step to build any operator is to give it a name through <code class="highlighter-rouge">named()</code> method. The name is propagated through system and can latter be used when debugging.</p>
+
+<h3 id="coders-and-types">Coders and Types</h3>
+<p>Beam’s Java SDK requires developers to supply <code class="highlighter-rouge">Coder</code> for custom element type in order to have a way of materializing elements. Euphoria allows to use <a href="https://github.com/EsotericSoftware/kryo">Kryo</a> as a way of serialization. The <a href="https://github.com/EsotericSoftware/kryo">Kryo</a> is located in <code class="highlighter-rouge">:beam-sdks-java-extensions-kryo</code> module.</p>
+
+<div class="language-groovy highlighter-rouge"><pre class="highlight"><code><span class="c1">//gradle</span>
+<span class="n">dependencies</span> <span class="o">{</span>
+    <span class="n">compile</span> <span class="s2">"org.apache.beam:beam-sdks-java-extensions-kryo:${beam.version}"</span>
+<span class="o">}</span>
+</code></pre>
+</div>
+<div class="language-xml highlighter-rouge"><pre class="highlight"><code>//maven
+<span class="nt">&lt;dependency&gt;</span>
+  <span class="nt">&lt;groupId&gt;</span>org.apache.beam<span class="nt">&lt;/groupId&gt;</span>
+  <span class="nt">&lt;artifactId&gt;</span>beam-sdks-java-extensions-kryo<span class="nt">&lt;/artifactId&gt;</span>
+  <span class="nt">&lt;version&gt;</span>${beam.version}<span class="nt">&lt;/version&gt;</span>
+<span class="nt">&lt;/dependency&gt;</span>
+</code></pre>
+</div>
+
+<p>All you need is to create <code class="highlighter-rouge">KryoCoderProvider</code> and register it to your
+<code class="highlighter-rouge">Pipeline</code>. There are two ways of doing that.</p>
+
+<p>When prototyping you may decide not to care much about coders, then create <code class="highlighter-rouge">KryoCoderProvider</code> without any class registrations to <a href="https://github.com/EsotericSoftware/kryo">Kryo</a>.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//Register `KryoCoderProvider` which attempt to use `KryoCoder` to every non-primitive type</span>
+<span class="n">KryoCoderProvider</span><span class="o">.</span><span class="na">of</span><span class="o">().</span><span class="na">registerTo</span><span class="o">(</span><span class="n">pipeline</span><span class="o">);</span>
+</code></pre>
+</div>
+<p>Such a <code class="highlighter-rouge">KryoCoderProvider</code> will return <code class="highlighter-rouge">KryoCoder</code> for every non-primitive element type. That of course degrades performance, since Kryo is not able to serialize instance of unknown types effectively. But it boost speed of pipeline development. This behavior is enabled by default and can be disabled when creating <code class="highlighter-rouge">Pipeline</code> through <code class="highlighter-rouge">KryoOptions< [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">PipelineOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">PipelineOptionsFactory</span><span class="o">.</span><span class="na">create</span><span class="o">();</span>
+<span class="n">options</span><span class="o">.</span><span class="na">as</span><span class="o">(</span><span class="n">KryoOptions</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">setKryoRegistrationRequired</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
+</code></pre>
+</div>
+
+<p>Second more performance friendly way is to register all the types which will Kryo serialize. Sometimes it is also a good idea to register Kryo serializers of its own too. Euphoria allows you to do that by implementing your own <code class="highlighter-rouge">KryoRegistrar</code> and using it when creating <code class="highlighter-rouge">KryoCoderProvider</code>.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//Do not allow `KryoCoderProvider` to return `KryoCoder` for unregistered types</span>
+<span class="n">options</span><span class="o">.</span><span class="na">as</span><span class="o">(</span><span class="n">KryoOptions</span><span class="o">.</span><span class="na">class</span><span class="o">).</span><span class="na">setKryoRegistrationRequired</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
+
+<span class="n">KryoCoderProvider</span><span class="o">.</span><span class="na">of</span><span class="o">(</span>
+        <span class="o">(</span><span class="n">kryo</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span> <span class="c1">//KryoRegistrar of your uwn</span>
+          <span class="n">kryo</span><span class="o">.</span><span class="na">register</span><span class="o">(</span><span class="n">KryoSerializedElementType</span><span class="o">.</span><span class="na">class</span><span class="o">);</span> <span class="c1">//other may follow</span>
+        <span class="o">})</span>
+    <span class="o">.</span><span class="na">registerTo</span><span class="o">(</span><span class="n">pipeline</span><span class="o">);</span>
+</code></pre>
+</div>
+<p>Beam resolves coders using types of elements. Type information is not available at runtime when element type is described by lambda implementation. It is due to type erasure and dynamic nature of lambda expressions. So there is an optional way of supplying <code class="highlighter-rouge">TypeDescriptor</code> every time new type is introduced during Operator construction.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">input</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">MapElements</span>
+  <span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"Int2Str"</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">valueOf</span><span class="o">,</span> <span class="n">TypeDescriptors</span><span class="o">.</span><span class="na">strings</span><span class="o">())</span>
+  <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+</code></pre>
+</div>
+<p>Euphoria operator’s will use <code class="highlighter-rouge">TypeDescriptor&lt;Object&gt;</code>, when <code class="highlighter-rouge">TypeDescriptors</code> is not supplied by user. So <code class="highlighter-rouge">KryoCoderProvider</code> may return <code class="highlighter-rouge">KryoCOder&lt;Object&gt;</code> for every element with unknown type, if allowed by <code class="highlighter-rouge">KryoOptions</code>. Supplying <code class="highlighter-rouge">TypeDescriptors</code> beco [...]
+
+<h3 id="metrics-and-accumulators">Metrics and Accumulators</h3>
+<p>Statistics about job’s internals are very helpful during development of distributed jobs. Euphoria calls them accumulators. They are accessible through environment <code class="highlighter-rouge">Context</code>, which can be obtained from <code class="highlighter-rouge">Collector</code>, whenever working with it. It is usually present when zero-to-many output elements are expected from operator. For example in case of <code class="highlighter-rouge">FlatMap</code>.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Pipeline</span> <span class="n">pipeline</span> <span class="o">=</span> <span class="o">...</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dataset</span> <span class="o">=</span> <span class="o">..</span>
+
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mapped</span> <span class="o">=</span>
+<span class="n">FlatMap</span>
+  <span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"FlatMap1"</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">dataset</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">String</span> <span class="n">value</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">context</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+      <span class="n">context</span><span class="o">.</span><span class="na">getCounter</span><span class="o">(</span><span class="s">"my-counter"</span><span class="o">).</span><span class="na">increment</span><span class="o">();</span>
+        <span class="n">context</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">value</span><span class="o">);</span>
     <span class="o">})</span>
+  <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+</code></pre>
+</div>
+<p><code class="highlighter-rouge">MapElements</code> also allows for <code class="highlighter-rouge">Context</code> to be accessed by supplying implementations of <code class="highlighter-rouge">UnaryFunctionEnv</code> (add second context argument) instead of <code class="highlighter-rouge">UnaryFunctor</code>.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Pipeline</span> <span class="n">pipeline</span> <span class="o">=</span> <span class="o">...</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">dataset</span> <span class="o">=</span> <span class="o">...</span>
+
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">mapped</span> <span class="o">=</span>
+  <span class="n">MapElements</span>
+    <span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"MapThem"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">dataset</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">input</span><span class="o">,</span> <span class="n">context</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+        <span class="c1">// use simple counter</span>
+        <span class="n">context</span><span class="o">.</span><span class="na">getCounter</span><span class="o">(</span><span class="s">"my-counter"</span><span class="o">).</span><span class="na">increment</span><span class="o">();</span>
+
+        <span class="k">return</span> <span class="n">input</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">();</span>
+        <span class="o">})</span>
+      <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+</code></pre>
+</div>
+<p>Accumulators are translated into Beam Metrics in background so they can be viewed the same way. Namespace of translated metrics is set to operator’s name.</p>
+
+<h3 id="windowing">Windowing</h3>
+<p>Euphoria follows the same <a href="/documentation/programming-guide/#windowing">windowing principles</a> as Beam Java SDK. Every shuffle operator (operator which needs to shuffle data over the network) allows you to set it. The same parameters as in Beam are required. <code class="highlighter-rouge">WindowFn</code>, <code class="highlighter-rouge">Trigger</code>, <code class="highlighter-rouge">WindowingStrategy</code> and other. Users are guided to either set all mandatory and severa [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="n">Dtaset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countedElements</span> <span class="o">=</span>
+<span class="n">CountByKey</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="n">e</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">windowBy</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardSeconds</span><span class="o">(</span><span class="mi">1</span><span class="o">)))</span>
+    <span class="o">.</span><span class="na">triggeredBy</span><span class="o">(</span><span class="n">DefaultTrigger</span><span class="o">.</span><span class="na">of</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">discardingFiredPanes</span><span class="o">()</span>
+    <span class="o">.</span><span class="na">withAllowedLateness</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">standardSeconds</span><span class="o">(</span><span class="mi">5</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">withOnTimeBehavior</span><span class="o">(</span><span class="n">OnTimeBehavior</span><span class="o">.</span><span class="na">FIRE_IF_NON_EMPTY</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">withTimestampCombiner</span><span class="o">(</span><span class="n">TimestampCombiner</span><span class="o">.</span><span class="na">EARLIEST</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+</code></pre>
+</div>
+
+<h3 id="integration-of-euphoria-into-existing-pipelines">Integration of Euphoria into existing pipelines</h3>
+<p><code class="highlighter-rouge">Euphoria</code> allows to define composite <code class="highlighter-rouge">PTransform</code> so Euphoria can be seamlessly integrated to already existing Beam <code class="highlighter-rouge">Pipelines</code>. User only need to provide implementation of function which takes input <code class="highlighter-rouge">Dataset</code>  and outputs another <code class="highlighter-rouge">Datatset</code>. The input dataset is nothing else than mirror of a input <co [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose inputs PCollection contains: [ "a", "b", "c", "A", "a", "C", "x"]</span>
+<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">lettersWithCounts</span> <span class="o">=</span>
+  <span class="n">inputs</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="s">"count-uppercase-letters-in-Euphoria"</span><span class="o">,</span>
+    <span class="n">Euphoria</span><span class="o">.</span><span class="na">of</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">input</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+        <span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">upperCase</span> <span class="o">=</span>
+          <span class="n">MapElements</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">using</span><span class="o">((</span><span class="n">UnaryFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;)</span> <span class="nl">String:</span><span class="o">:</span><span class="n">toUpperCase</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+
+        <span class="k">return</span> <span class="n">CountByKey</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">upperCase</span><span class="o">).</span><span class="na">keyBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="n">e</span><span class="o">).</span><span class="na">output</span><span class="o">();</span>
+    <span class="o">}));</span>
+<span class="c1">//now the 'lettersWithCounts' will conntain [ KV("A", 3L), KV("B", 1L), KV("C", 2L), KV("X", 1L) ]</span>
+</code></pre>
+</div>
+
+<h2 id="how-to-get-euphoria">How to get Euphoria</h2>
+<p>Euphoria is located in <code class="highlighter-rouge">dsl-euphoria</code> branch, <code class="highlighter-rouge">beam-sdks-java-extensions-euphoria</code> module of The Apache Beam project. To build <code class="highlighter-rouge">euphoria</code> subproject call:</p>
+<div class="highlighter-rouge"><pre class="highlight"><code>./gradlew beam-sdks-java-extensions-euphoria:build
+</code></pre>
+</div>
+
+<h2 id="operator-reference">Operator Reference</h2>
+<p>Operators are basically higher level data transformations, which allows you to build business logic of your data processing job in a simple way. All the Euphoria operators are documented in this section including examples. There are no examples with <a href="#windowing">windowing</a> applied for the sake of simplicity. Refer to the <a href="#windowing">windowing section</a> for more details.</p>
+
+<h3 id="countbykey"><code class="highlighter-rouge">CountByKey</code></h3>
+<p>Counting elements with the same key. Requires input dataset to be mapped by given key extractor (<code class="highlighter-rouge">UnaryFunction</code>) to keys which are then counted. Output is emitted as <code class="highlighter-rouge">KV&lt;K, Long&gt;</code> (<code class="highlighter-rouge">K</code> is key type) where each <code class="highlighter-rouge">KV</code> contains key and number of element in input dataset for the key.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose input: [1, 2, 4, 1, 1, 3]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span>
+  <span class="n">CountByKey</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="n">e</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// Output will contain:  [KV(1, 3), KV(2, 1), KV(3, 1), (4, 1)]</span>
+</code></pre>
+</div>
+
+<h3 id="distinct"><code class="highlighter-rouge">Distinct</code></h3>
+<p>Outputting distinct (based on equals method) elements. It takes optional <code class="highlighter-rouge">UnaryFunction</code> mapper parameter which maps elements to output type.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose input: [1, 2, 3, 3, 2, 1]</span>
+<span class="n">Distinct</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"unique-integers-only"</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// Output will contain:  1, 2, 3</span>
+</code></pre>
+</div>
+<p><code class="highlighter-rouge">Distinct</code> with mapper.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose keyValueInput: [KV(1, 100L), KV(3, 100_000L), KV(42, 10L), KV(1, 0L), KV(3, 0L)]</span>
+<span class="n">Distinct</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"unique-keys-only"</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">keyValueInput</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">mapped</span><span class="o">(</span><span class="nl">KV:</span><span class="o">:</span><span class="n">getKey</span><span class="o">)</span>
+  <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// Output will contain:  1, 3, 42</span>
+</code></pre>
+</div>
+
+<h3 id="join"><code class="highlighter-rouge">Join</code></h3>
+<p>Represents inner join of two (left and right) datasets on given key producing a new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as <code class="highlighter-rouge">LeftT</code> and <code class="highlighter-rouge">RightT</code>. The join itself is performed by user-supplied <code class="highlighter-rouge">BinaryFunctor</code> which consumes elements from both dataset sharing the same key. And outputs  [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]</span>
+<span class="c1">// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">joined</span> <span class="o">=</span>
+  <span class="n">Join</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"join-length-to-words"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">by</span><span class="o">(</span><span class="n">le</span> <span class="o">-&gt;</span> <span class="n">le</span><span class="o">,</span> <span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// key extractors</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">((</span><span class="n">Integer</span> <span class="n">l</span><span class="o">,</span> <span class="n">String</span> <span class="n">r</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">c</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">c</span><span class="o">.</span><span cla [...]
     <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"), KV(4, "4+duck"),</span>
+<span class="c1">// KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X")]</span>
+</code></pre>
+</div>
 
-<span class="c1">// From each input element we extract a key (word) and value, which is the constant `1`.</span>
-<span class="c1">// Then, we reduce by the key - the operator ensures that all values for the same</span>
-<span class="c1">// key end up being processed together. It applies user defined function (summing word counts for each</span>
-<span class="c1">// unique word) and its emitted to output. </span>
-<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Pair</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">counted</span> <span class="o">=</span> <span class="n">ReduceByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"COUNT"</span><span class="o">)</span>
+<h3 id="leftjoin"><code class="highlighter-rouge">LeftJoin</code></h3>
+<p>Represents left join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as <code class="highlighter-rouge">LeftT</code> and <code class="highlighter-rouge">RightT</code>. The join itself is performed by user-supplied <code class="highlighter-rouge">BinaryFunctor</code> which consumes one element from both dataset, where right is present opt [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]</span>
+<span class="c1">// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+    <span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">joined</span> <span class="o">=</span>
+        <span class="n">LeftJoin</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"left-join-length-to-words"</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span>
+            <span class="o">.</span><span class="na">by</span><span class="o">(</span><span class="n">le</span> <span class="o">-&gt;</span> <span class="n">le</span><span class="o">,</span> <span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// key extractors</span>
+            <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+                <span class="o">(</span><span class="n">Integer</span> <span class="n">l</span><span class="o">,</span> <span class="n">Optional</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">r</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">c</span><span class="o">)</span> <span class="o">-&gt;</span>
+                    <span class="n">c</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">l</span> <span class="o">+</span> <span class="s">"+"</span> <span class="o">+</span> <span class="n">r</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">)))</span>
+            <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// joined will contain: [KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"),</span>
+<span class="c1">// KV(3, "3+rat"), KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"),</span>
+<span class="c1">// KV(3, "3+rat"), KV(1, "1+X")]</span>
+</code></pre>
+</div>
+<p>Euphoria support performance optimization called ‘BroadcastHashJoin’ for the <code class="highlighter-rouge">LeftJoin</code>. User can indicate through previous operator’s output hint <code class="highlighter-rouge">.output(SizeHint.FITS_IN_MEMORY)</code> that output <code class="highlighter-rouge">Dataset</code> of that operator fits in executors memory. And when the <code class="highlighter-rouge">Dataset</code> is used as right input, Euphoria will automatically translated <code cl [...]
+
+<h3 id="rightjoin"><code class="highlighter-rouge">RightJoin</code></h3>
+<p>Represents right join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as <code class="highlighter-rouge">LeftT</code> and <code class="highlighter-rouge">RightT</code>. The join itself is performed by user-supplied <code class="highlighter-rouge">BinaryFunctor</code> which consumes one element from both dataset, where left is present opt [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]</span>
+<span class="c1">// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">joined</span> <span class="o">=</span>
+  <span class="n">RightJoin</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"right-join-length-to-words"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">by</span><span class="o">(</span><span class="n">le</span> <span class="o">-&gt;</span> <span class="n">le</span><span class="o">,</span> <span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// key extractors</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">Optional</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">l</span><span class="o">,</span> <span class="n">String</span> <span class="n">r</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">c</span><span class="o">)</span> <span class="o">-&gt;</span>
+        <span class="n">c</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">l</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">)</span> <span class="o">+</span> <span class="s">"+"</span> <span class="o">+</span> <span class="n">r</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+    <span class="c1">// joined will contain: [ KV(1, "1+X"), KV(3, "3+cat"), KV(3, "3+rat"),</span>
+    <span class="c1">// KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"), KV(1, "1+X"),</span>
+    <span class="c1">// KV(8, "null+elephant"), KV(5, "null+mouse")]</span>
+</code></pre>
+</div>
+<p>Euphoria support performance optimization called ‘Broadcast Hash Join’ for the <code class="highlighter-rouge">RightJoin</code>. User can indicate through previous operator’s output hint <code class="highlighter-rouge">.output(SizeHint.FITS_IN_MEMORY)</code> that output <code class="highlighter-rouge">Dataset</code> of that operator fits in executors memory. And when the <code class="highlighter-rouge">Dataset</code> is used as left input, Euphoria will automatically translated <code  [...]
+
+<h3 id="fulljoin"><code class="highlighter-rouge">FullJoin</code></h3>
+<p>Represents full outer join of two (left and right) datasets on given key producing single new dataset. Key is extracted from both datasets by separate extractors so elements in left and right can have different types denoted as <code class="highlighter-rouge">LeftT</code> and <code class="highlighter-rouge">RightT</code>. The join itself is performed by user-supplied <code class="highlighter-rouge">BinaryFunctor</code> which consumes one element from both dataset, where both are prese [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose that left contains: [1, 2, 3, 0, 4, 3, 1]</span>
+<span class="c1">// suppose that right contains: ["mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">joined</span> <span class="o">=</span>
+  <span class="n">FullJoin</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"join-length-to-words"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">left</span><span class="o">,</span> <span class="n">right</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">by</span><span class="o">(</span><span class="n">le</span> <span class="o">-&gt;</span> <span class="n">le</span><span class="o">,</span> <span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// key extractors</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">Optional</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">l</span><span class="o">,</span> <span class="n">Optional</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">r</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">c [...]
+        <span class="n">c</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">l</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">)</span> <span class="o">+</span> <span class="s">"+"</span> <span class="o">+</span> <span class="n">r</span><span class="o">.</span><span class="na">orElse</span><span class="o">(</span><span class="kc">null</span><span class="o">)))</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// joined will contain: [ KV(1, "1+X"), KV(2, "2+null"), KV(3, "3+cat"), KV(3, "3+rat"),</span>
+<span class="c1">// KV(0, "0+null"), KV(4, "4+duck"), KV(3, "3+cat"), KV(3, "3+rat"),KV(1, "1+X"),</span>
+<span class="c1">//  KV(1, "null+elephant"), KV(5, "null+mouse")]</span>
+</code></pre>
+</div>
+
+<h3 id="mapelements"><code class="highlighter-rouge">MapElements</code></h3>
+<p>Transforms one input element of input type <code class="highlighter-rouge">InputT</code> to one output element of another (potentially the same) <code class="highlighter-rouge">OutputT</code> type. Transformation is done through user specified <code class="highlighter-rouge">UnaryFunction</code>.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose inputs contains: [ 0, 1, 2, 3, 4, 5]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">strings</span> <span class="o">=</span>
+  <span class="n">MapElements</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"int2str"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="n">i</span> <span class="o">-&gt;</span> <span class="s">"#"</span> <span class="o">+</span> <span class="n">i</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// strings will contain: [ "#0", "#1", "#2", "#3", "#4", "#5"]</span>
+</code></pre>
+</div>
+
+<h3 id="flatmap"><code class="highlighter-rouge">FlatMap</code></h3>
+<p>Transforms one input element of input type <code class="highlighter-rouge">InputT</code> to zero or more output elements of another (potentially the same) <code class="highlighter-rouge">OutputT</code> type. Transformation is done through user specified <code class="highlighter-rouge">UnaryFunctor</code>, where <code class="highlighter-rouge">Collector&lt;OutputT&gt;</code> is utilized to emit output elements. Notice similarity with <code class="highlighter-rouge">MapElements</code> w [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose words contain: ["Brown", "fox", ".", ""]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">letters</span> <span class="o">=</span>
+  <span class="n">FlatMap</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"str2char"</span><span class="o">)</span>
     <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">words</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="n">w</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">w</span> <span class="o">-&gt;</span> <span class="mi">1L</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">combineBy</span><span class="o">(</span><span class="n">Sums</span><span class="o">.</span><span class="na">ofLongs</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+        <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="n">s</span><span class="o">.</span><span class="na">length</span><span class="o">();</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
+          <span class="kt">char</span> <span class="n">c</span> <span class="o">=</span> <span class="n">s</span><span class="o">.</span><span class="na">charAt</span><span class="o">(</span><span class="n">i</span><span class="o">);</span>
+          <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">c</span><span class="o">));</span>
+        <span class="o">}</span>
+      <span class="o">})</span>
     <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// characters will contain: ["B", "r", "o", "w", "n",  "f", "o", "x", "."]</span>
+</code></pre>
+</div>
+<p><code class="highlighter-rouge">FlatMap</code> may be used to determine time-stamp of elements. It is done by supplying implementation of <code class="highlighter-rouge">ExtractEventTime</code> time extractor when building it. There is specialized <code class="highlighter-rouge">AssignEventTime</code> operator to assign time-stamp to elements. Consider using it, you code may be more readable.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">SomeEventObject</span><span class="o">&gt;</span> <span class="n">timeStampedEvents</span> <span class="o">=</span>
+  <span class="n">FlatMap</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"extract-event-time"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">events</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span> <span class="o">(</span><span class="n">SomeEventObject</span> <span class="n">e</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">SomeEventObject</span><span class="o">&gt;</span> <span class="n">c</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">c</span><span class="o">.</span><span class="na">collect</span><span class="o"> [...]
+    <span class="o">.</span><span class="na">eventTimeBy</span><span class="o">(</span><span class="nl">SomeEventObject:</span><span class="o">:</span><span class="n">getEventTimeInMillis</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">//Euphoria will now know event time for each event</span>
+</code></pre>
+</div>
 
-<span class="c1">// Format output.</span>
-<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span> <span class="n">MapElements</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"FORMAT"</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">counted</span><span class="o">)</span>
-    <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="n">p</span> <span class="o">-&gt;</span> <span class="n">p</span><span class="o">.</span><span class="na">getFirst</span><span class="o">()</span> <span class="o">+</span> <span class="s">": "</span> <span class="o">+</span> <span class="n">p</span><span class="o">.</span><span class="na">getSecond</span><span class="o">())</span>
+<h3 id="filter"><code class="highlighter-rouge">Filter</code></h3>
+<p><code class="highlighter-rouge">Filter</code> throws away all the elements which do not pass given condition. The condition is supplied by the user as implementation of <code class="highlighter-rouge">UnaryPredicate</code>. Input and output elements are of the same type.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose nums contains: [0,  1, 2, 3, 4, 5, 6, 7, 8, 9]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">divisibleBythree</span> <span class="o">=</span>
+  <span class="n">Filter</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"divisibleByThree"</span><span class="o">).</span><span class="na">of</span><span class="o">(</span><span class="n">nums</span><span class="o">).</span><span class="na">by</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="n">e</span> <span class="o">%</span> <span class="mi">3</span> <span class="o">==</span> <span clas [...]
+<span class="c1">//divisibleBythree will contain: [ 0, 3, 6, 9]</span>
+</code></pre>
+</div>
+
+<h3 id="reducebykey"><code class="highlighter-rouge">ReduceByKey</code></h3>
+<p>Performs aggregation of <code class="highlighter-rouge">InputT</code> type elements with the same key through user-supplied reduce function. Key is extracted from each element through <code class="highlighter-rouge">UnaryFunction</code> which takes input element and outputs its key of type <code class="highlighter-rouge">K</code>. Elements can optionally be mapped to value of type <code class="highlighter-rouge">V</code>, it happens before elements shuffle, so it can have positive per [...]
+
+<p>Finally, elements with the same key are aggregated by user-defined <code class="highlighter-rouge">ReduceFunctor</code>, <code class="highlighter-rouge">ReduceFunction</code> or <code class="highlighter-rouge">CombinableReduceFunction</code>. They differs in number of arguments they take and in way output is interpreted. <code class="highlighter-rouge">ReduceFunction</code> is basically a function which takes <code class="highlighter-rouge">Stream</code> of elements as input and outpu [...]
+
+<p>Following example shows basic usage of <code class="highlighter-rouge">ReduceByKey</code> operator including value extraction.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countOfAnimalNamesByLength</span> <span class="o">=</span>
+  <span class="n">ReduceByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"to-letters-couts"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">animals</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// length of animal name will be used as groupping key</span>
+    <span class="c1">// we need to count each animal name once, so why not to optimize each string to 1</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="mi">1</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">reduceBy</span><span class="o">(</span><span class="nl">Stream:</span><span class="o">:</span><span class="n">count</span><span class="o">)</span>
     <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]</span>
+</code></pre>
+</div>
 
-<span class="c1">// Transform Dataset back to PCollection. It can be done in any step of this flow.</span>
-<span class="n">PCollection</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">outputCollection</span> <span class="o">=</span> <span class="n">flow</span><span class="o">.</span><span class="na">unwrapped</span><span class="o">(</span><span class="n">output</span><span class="o">);</span>
+<p>Now suppose that we want to track our <code class="highlighter-rouge">ReduceByKey</code> internals using counter.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countOfAnimalNamesByLenght</span> <span class="o">=</span>
+  <span class="n">ReduceByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"to-letters-couts"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">animals</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// length of animal name will be used as grouping key</span>
+    <span class="c1">// we need to count each animal name once, so why not to optimize each string to 1</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="mi">1</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">reduceBy</span><span class="o">(</span>
+      <span class="o">(</span><span class="n">Stream</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">s</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+        <span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">s</span><span class="o">.</span><span class="na">count</span><span class="o">());</span>
+        <span class="n">collector</span><span class="o">.</span><span class="na">asContext</span><span class="o">().</span><span class="na">getCounter</span><span class="o">(</span><span class="s">"num-of-keys"</span><span class="o">).</span><span class="na">increment</span><span class="o">();</span>
+      <span class="o">})</span>
+      <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]</span>
+</code></pre>
+</div>
 
-<span class="c1">// Now we can again use Beam transformation. In this case we save words and their count</span>
-<span class="c1">// into the text file.</span>
-<span class="n">outputCollection</span><span class="o">.</span><span class="na">apply</span><span class="o">(</span><span class="n">TextIO</span><span class="o">.</span><span class="na">write</span><span class="o">().</span><span class="na">to</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getOutput</span><span class="o">()));</span>
+<p>Again the same example with optimized combinable output.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countOfAnimalNamesByLenght</span> <span class="o">=</span>
+  <span class="n">ReduceByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"to-letters-couts"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">animals</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// length of animal name will e used as grouping key</span>
+    <span class="c1">// we need to count each animal name once, so why not to optimize each string to 1</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="mi">1L</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">combineBy</span><span class="o">(</span><span class="n">s</span> <span class="o">-&gt;</span> <span class="n">s</span><span class="o">.</span><span class="na">mapToLong</span><span class="o">(</span><span class="n">l</span> <span class="o">-&gt;</span> <span class="n">l</span><span class="o">).</span><span class="na">sum</span><span class="o">())</span> <span class="c1">//Stream::count will not be enough</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// countOfAnimalNamesByLength wil contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]</span>
+</code></pre>
+</div>
+<p>Note that the provided <code class="highlighter-rouge">CombinableReduceFunction</code> has to be associative and commutative to be truly combinable. So it can be used to compute partial results before shuffle. And then merge partial result to one. That is why simple <code class="highlighter-rouge">Stream::count</code> will not work in this example unlike in the previous one.</p>
+
+<p>Euphoria aims to make code easy to write and read. Therefore some support to write combinable reduce functions in form of <code class="highlighter-rouge">Fold</code> or folding function is already there. It allows user to supply only the reduction logic (<code class="highlighter-rouge">BinaryFunction</code>) and creates <code class="highlighter-rouge">CombinableReduceFunction</code> out of it. Supplied <code class="highlighter-rouge">BinaryFunction</code> still have to be associative.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose animals contains : [ "mouse", "rat", "elephant", "cat", "X", "duck"]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">countOfAnimalNamesByLenght</span> <span class="o">=</span>
+  <span class="n">ReduceByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"to-letters-couts"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">animals</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// length of animal name will be used as grouping key</span>
+    <span class="c1">// we need to count each animal name once, so why not to optimize each string to 1</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="mi">1L</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">combineBy</span><span class="o">(</span><span class="n">Fold</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">l1</span><span class="o">,</span> <span class="n">l2</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">l1</span> <span class="o">+</span> <span class="n">l2</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// countOfAnimalNamesByLength will contain [ KV.of(1, 1L), KV.of(3, 2L), KV.of(4, 1L), KV.of(5, 1L), KV.of(8, 1L) ]</span>
+</code></pre>
+</div>
 
-<span class="n">pipeline</span><span class="o">.</span><span class="na">run</span><span class="o">();</span>
+<h3 id="reducewindow"><code class="highlighter-rouge">ReduceWindow</code></h3>
+<p>Reduces all elements in a <a href="#windowing">window</a>. The operator corresponds to <code class="highlighter-rouge">ReduceByKey</code> with the same key for all elements, so the actual key is defined only by window.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose input contains [ 1, 2, 3, 4, 5, 6, 7, 8 ]</span>
+<span class="c1">//lets assign time-stamp to each input element</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">withEventTime</span> <span class="o">=</span> <span class="n">AssignEventTime</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">).</span><span class="na">using</span><span class="o">(</span><span class="n">i</span> <span class="o">-&gt;</span> <span class="mi">1000L</span> <span class=" [...]
+
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">output</span> <span class="o">=</span>
+  <span class="n">ReduceWindow</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">withEventTime</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">combineBy</span><span class="o">(</span><span class="n">Fold</span><span class="o">.</span><span class="na">of</span><span class="o">((</span><span class="n">i1</span><span class="o">,</span> <span class="n">i2</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">i1</span> <span class="o">+</span> <span class="n">i2</span><span class="o">))</span>
+    <span class="o">.</span><span class="na">windowBy</span><span class="o">(</span><span class="n">FixedWindows</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">Duration</span><span class="o">.</span><span class="na">millis</span><span class="o">(</span><span class="mi">5000</span><span class="o">)))</span>
+    <span class="o">.</span><span class="na">triggeredBy</span><span class="o">(</span><span class="n">DefaultTrigger</span><span class="o">.</span><span class="na">of</span><span class="o">())</span>
+    <span class="o">.</span><span class="na">discardingFiredPanes</span><span class="o">()</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">//output will contain: [ 10, 26 ]</span>
 </code></pre>
 </div>
 
+<h3 id="sumbykey"><code class="highlighter-rouge">SumByKey</code></h3>
+<p>Summing elements with same key. Requires input dataset to be mapped by given key extractor (<code class="highlighter-rouge">UnaryFunction</code>) to keys. By value extractor, also <code class="highlighter-rouge">UnaryFunction</code> which outputs to <code class="highlighter-rouge">Long</code>, to values. Those values are then grouped by key and summed. Output is emitted as <code class="highlighter-rouge">KV&lt;K, Long&gt;</code> (<code class="highlighter-rouge">K</code> is key type) w [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose input contains: [ 1, 2, 3, 4, 5, 6, 7, 8, 9 ]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">KV</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;&gt;</span> <span class="n">output</span> <span class="o">=</span>
+  <span class="n">SumByKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"sum-odd-and-even"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">input</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="n">e</span> <span class="o">%</span> <span class="mi">2</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">e</span> <span class="o">-&gt;</span> <span class="o">(</span><span class="kt">long</span><span class="o">)</span> <span class="n">e</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// output will contain: [ KV.of(0, 20L), KV.of(1, 25L)]</span>
+</code></pre>
+</div>
+
+<h3 id="union"><code class="highlighter-rouge">Union</code></h3>
+<p>Merge of at least two datasets of the same type without any guarantee about elements ordering.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">//suppose cats contains: [ "cheetah", "cat", "lynx", "jaguar" ]</span>
+<span class="c1">//suppose rodents conains: [ "squirrel", "mouse", "rat", "lemming", "beaver" ]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">animals</span> <span class="o">=</span>
+  <span class="n">Union</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"to-animals"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">cats</span><span class="o">,</span> <span class="n">rodents</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">// animal will contain: "cheetah", "cat", "lynx", "jaguar", "squirrel", "mouse", "rat", "lemming", "beaver"</span>
+</code></pre>
+</div>
+
+<h3 id="topperkey"><code class="highlighter-rouge">TopPerKey</code></h3>
+<p>Emits one top-rated element per key. Key of type <code class="highlighter-rouge">K</code> is extracted by given <code class="highlighter-rouge">UnaryFunction</code>. Another <code class="highlighter-rouge">UnaryFunction</code> extractor allows for conversion input elements to values of type <code class="highlighter-rouge">V</code>. Selection of top element is based on <em>score</em>, which is obtained from each element by user supplied <code class="highlighter-rouge">UnaryFunction</co [...]
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose 'animals contain: [ "mouse", "elk", "rat", "mule", "elephant", "dinosaur", "cat", "duck", "caterpillar" ]</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">Triple</span><span class="o">&lt;</span><span class="n">Character</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">longestNamesByLetter</span> <span class="o">=</span>
+  <span class="n">TopPerKey</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"longest-animal-names"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">animals</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="n">name</span> <span class="o">-&gt;</span> <span class="n">name</span><span class="o">.</span><span class="na">charAt</span><span class="o">(</span><span class="mi">0</span><span class="o">))</span> <span class="c1">// first character is the key</span>
+    <span class="o">.</span><span class="na">valueBy</span><span class="o">(</span><span class="n">UnaryFunction</span><span class="o">.</span><span class="na">identity</span><span class="o">())</span> <span class="c1">// value type is the same as input element type</span>
+    <span class="o">.</span><span class="na">scoreBy</span><span class="o">(</span><span class="nl">String:</span><span class="o">:</span><span class="n">length</span><span class="o">)</span> <span class="c1">// length defines score, note that Integer implements Comparable&lt;Integer&gt;</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">//longestNamesByLetter wil contain: [ ('m', "mouse", 5), ('r', "rat", 3), ('e', "elephant", 8), ('d', "dinosaur", 8), ('c', "caterpillar", 11) ]</span>
+</code></pre>
+</div>
+<p><code class="highlighter-rouge">TopPerKey</code> is a shuffle operator so it allows for widowing to be defined.</p>
+
+<h3 id="assigneventtime"><code class="highlighter-rouge">AssignEventTime</code></h3>
+<p>Euphoria needs to know how to extract time-stamp from elements when <a href="#windowing">windowing</a> is applied. <code class="highlighter-rouge">AssignEventTime</code> tells Euphoria how to do that through given implementation of <code class="highlighter-rouge">ExtractEventTime</code> function.</p>
+<div class="language-java highlighter-rouge"><pre class="highlight"><code><span class="c1">// suppose events contain events of SomeEventObject, its 'getEventTimeInMillis()' methods returns time-stamp</span>
+<span class="n">Dataset</span><span class="o">&lt;</span><span class="n">SomeEventObject</span><span class="o">&gt;</span> <span class="n">timeStampedEvents</span> <span class="o">=</span>
+  <span class="n">AssignEventTime</span><span class="o">.</span><span class="na">named</span><span class="o">(</span><span class="s">"extract-event-tyme"</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">events</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">using</span><span class="o">(</span><span class="nl">SomeEventObject:</span><span class="o">:</span><span class="n">getEventTimeInMillis</span><span class="o">)</span>
+    <span class="o">.</span><span class="na">output</span><span class="o">();</span>
+<span class="c1">//Euphoria will now know event time for each event</span>
+</code></pre>
+</div>
+
+<h2 id="euphoria-to-beam-translation-advanced-user-section">Euphoria To Beam Translation (advanced user section)</h2>
+<p>Euphoria API is build on top of Beam Java SDK. The API is transparently translated into Beam’s <code class="highlighter-rouge">PTransforms</code> in background. Most of the translation happens in <code class="highlighter-rouge">org.apache.beam.sdk.extensions.euphoria.core.translate</code> package. Where the most interesting classes are:</p>
+<ul>
+  <li><code class="highlighter-rouge">OperatorTranslator</code> - Interface which defining inner API of Euphoria to Beam translation.</li>
+  <li><code class="highlighter-rouge">TranslatorProvider</code> - Way of supplying custom translators.</li>
+  <li><code class="highlighter-rouge">OperatorTransform</code> - Which is governing actual translation and/or expansion Euphoria’s operators to Beam’s <code class="highlighter-rouge">PTransform</code></li>
+  <li><code class="highlighter-rouge">EuphoriaOptions</code> - A <code class="highlighter-rouge">PipelineOptions</code>, allows for setting custom <code class="highlighter-rouge">TranslatorProvider</code>.</li>
+</ul>
+
+<p>The package also contains implementation of <code class="highlighter-rouge">OperatorTranslator</code> for each supported operator type (<code class="highlighter-rouge">JoinTranslator</code>, <code class="highlighter-rouge">FlatMapTranslator</code>, <code class="highlighter-rouge">ReduceByKeyTranslator</code>). Not every operator needs to have translator of its own. Some of them can be composed from other operators. That is why operators may implement <code class="highlighter-rouge">Co [...]
+
+<p>The translation process was designed with flexibility in mind. We wanted to allow different ways of translating higher-level Euphoria operators to Beam’s SDK’s primitives. It allows for further performance optimizations based on user choices or some knowledge about data obtained automatically.</p>
+
+<h3 id="unsupported-features">Unsupported Features</h3>
+<p><a href="https://github.com/seznam/euphoria">Original Euphoria</a> contained some features and operators not jet supported in Beam port. List of not yet supported features follows:</p>
+<ul>
+  <li>Translation of <code class="highlighter-rouge">ReduceStateByKey</code> operator to Beam is not supported. Therefore <code class="highlighter-rouge">TopPerKey</code> decomposable to RSBK is also not supported.</li>
+  <li><code class="highlighter-rouge">ReduceByKey</code> in original Euphoria was allowed to sort output values (per key). This is also not yet translatable into Beam, therefore not supported.</li>
+</ul>
 
       </div>
     </div>