You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/11/27 01:15:16 UTC

[02/15] apex-site git commit: Adding malhar- documentation

http://git-wip-us.apache.org/repos/asf/apex-site/blob/357a5a07/docs/malhar-3.6/operators/jsonParser/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.6/operators/jsonParser/index.html b/docs/malhar-3.6/operators/jsonParser/index.html
new file mode 100644
index 0000000..17f83b4
--- /dev/null
+++ b/docs/malhar-3.6/operators/jsonParser/index.html
@@ -0,0 +1,404 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Json Parser - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Json Parser";
+    var mkdocs_page_input_path = "operators/jsonParser.md";
+    var mkdocs_page_url = "/operators/jsonParser/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">Csv Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Json Parser</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#json-parser">Json Parser</a></li>
+                
+                    <li><a class="toctree-l4" href="#operator-objective">Operator Objective</a></li>
+                
+                    <li><a class="toctree-l4" href="#class-diagram">Class Diagram</a></li>
+                
+                    <li><a class="toctree-l4" href="#operator-information">Operator Information</a></li>
+                
+                    <li><a class="toctree-l4" href="#properties-attributes-and-ports">Properties, Attributes and Ports</a></li>
+                
+                    <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#example">Example</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Json Parser</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="json-parser">Json Parser</h1>
+<h2 id="operator-objective">Operator Objective</h2>
+<p>Purpose of JsonParser operator is to parse JSON records and construct a Plain Old Java Object ("POJO") out of it. The operator also emits each record as JSONObject if the relevant output port is connected. User can also provide a schema describing JSON data to validate incoming JSON records. Valid records will be emitted as POJO / JSONObject while invalid ones are emitted on error port with the error message if the error port is connected.</p>
+<p>Json Parser is <strong>idempotent</strong>, <strong>fault-tolerant</strong> &amp; <strong>statically/dynamically partitionable</strong>.</p>
+<h2 id="class-diagram">Class Diagram</h2>
+<p><img alt="" src="../images/jsonParser/JsonParser.png" /></p>
+<h2 id="operator-information">Operator Information</h2>
+<ol>
+<li>Operator location:<strong><em>malhar-contrib</em></strong></li>
+<li>Available since:<strong><em>3.2.0</em></strong></li>
+<li>Operator state:<strong><em>Evolving</em></strong></li>
+<li>Java Package:<a href="https://github.com/apache/apex-malhar/blob/master/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java">com.datatorrent.contrib.parser.JsonParser</a></li>
+</ol>
+<h2 id="properties-attributes-and-ports">Properties, Attributes and Ports</h2>
+<h3 id="properties-of-json-parser"><a name="props"></a>Properties of Json Parser</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Property</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+<th><strong>Default Value</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>jsonSchema</em></td>
+<td><a href="http://json-schema.org/">Schema</a>  describing JSON data. Incoming records can be validated using the jsonSchema. If the data is not as per the requirements specified in jsonSchema, they are emitted on the error port.This is an optional property. If the schema is not provided, incoming tuples are simply converted to POJO or JSONObject without any validations</td>
+<td>String</td>
+<td>NO</td>
+<td>N/A</td>
+</tr>
+</tbody>
+</table>
+<h3 id="platform-attributes-that-influences-operator-behavior">Platform Attributes that influences operator behavior</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Attribute</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>out.TUPLE_CLASS</em></td>
+<td>TUPLE_CLASS attribute on output port which tells operator the class of POJO which needs to be emitted. The name of the field members of the class must match with the names in incoming POJO. The operator ignores unknown properties.</td>
+<td>Class or FQCN</td>
+<td>Yes</td>
+</tr>
+</tbody>
+</table>
+<h3 id="ports">Ports</h3>
+<table>
+<thead>
+<tr>
+<th><strong>Port</strong></th>
+<th><strong>Description</strong></th>
+<th><strong>Type</strong></th>
+<th><strong>Mandatory</strong></th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td><em>in</em></td>
+<td>Tuples that needs to be parsed are recieved on this port</td>
+<td>byte[]</td>
+<td>Yes</td>
+</tr>
+<tr>
+<td><em>out</em></td>
+<td>Valid Tuples that are emitted as pojo. Tuples are converted to POJO only if the port is connected.</td>
+<td>Object (POJO)</td>
+<td>No</td>
+</tr>
+<tr>
+<td><em>parsedOutput</em></td>
+<td>Valid Tuples that are emitted as JSONObject. Tuples are converted to JSONObject only if the port is connected.</td>
+<td>JSONObject</td>
+<td>No</td>
+</tr>
+<tr>
+<td><em>err</em></td>
+<td>Invalid Tuples are emitted with error message. Invaid tuples are discarded if the port is not connected.</td>
+<td>KeyValPair &lt;String, String></td>
+<td>No</td>
+</tr>
+</tbody>
+</table>
+<h2 id="partitioning">Partitioning</h2>
+<p>JSON Parser is both statically and dynamically partitionable.</p>
+<h3 id="static-partitioning">Static Partitioning</h3>
+<p>This can be achieved in 2 ways</p>
+<ol>
+<li>Specifying the partitioner and number of partitions in the populateDAG() method</li>
+</ol>
+<pre><code class="java">JsonParser jsonParser = dag.addOperator(&quot;jsonParser&quot;, JsonParser.class);
+StatelessPartitioner&lt;JsonParser&gt; partitioner1 = new StatelessPartitioner&lt;JsonParser&gt;(2);
+dag.setAttribute(jsonParser, Context.OperatorContext.PARTITIONER, partitioner1 );
+</code></pre>
+
+<ol>
+<li>Specifying the partitioner in properties file.</li>
+</ol>
+<pre><code class="xml"> &lt;property&gt;
+   &lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+   &lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:2&lt;/value&gt;
+ &lt;/property&gt;
+</code></pre>
+
+<p>where {OperatorName} is the name of the JsonParser operator.
+ Above lines will partition JsonParser statically 2 times. Above value can be changed accordingly to change the number of static partitions.</p>
+<h3 id="dynamic-paritioning">Dynamic Paritioning</h3>
+<p>JsonParser can be dynamically partitioned using an out-of-the-box partitioner:</p>
+<h4 id="throughput-based">Throughput based</h4>
+<p>Following code can be added to populateDAG method of application to dynamically partition JsonParser:</p>
+<pre><code class="java">JsonParser jsonParser = dag.addOperator(&quot;jsonParser&quot;, JsonParser.class);
+StatelessThroughputBasedPartitioner&lt;JsonParser&gt; partitioner = new StatelessThroughputBasedPartitioner&lt;&gt;();
+partitioner.setCooldownMillis(conf.getLong(COOL_DOWN_MILLIS, 10000));
+partitioner.setMaximumEvents(conf.getLong(MAX_THROUGHPUT, 30000));
+partitioner.setMinimumEvents(conf.getLong(MIN_THROUGHPUT, 10000));
+dag.setAttribute(jsonParser, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+dag.setAttribute(jsonParser, OperatorContext.PARTITIONER, partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition JsonParser when the throughput changes.
+If the overall throughput of JsonParser goes beyond 30000 or less than 10000, the platform will repartition JsonParser
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the throughput change is observed.</p>
+<h2 id="example">Example</h2>
+<p>Example for Json Parser can be found at: <a href="https://github.com/DataTorrent/examples/tree/master/tutorials/parser">https://github.com/DataTorrent/examples/tree/master/tutorials/parser</a></p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+        <a href="../jsonFormatter/" class="btn btn-neutral float-right" title="Json Formatter">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../windowedOperator/" class="btn btn-neutral" title="Windowed Operator"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../windowedOperator/" style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../jsonFormatter/" style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/357a5a07/docs/malhar-3.6/operators/kafkaInputOperator/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.6/operators/kafkaInputOperator/index.html b/docs/malhar-3.6/operators/kafkaInputOperator/index.html
new file mode 100644
index 0000000..6b83c03
--- /dev/null
+++ b/docs/malhar-3.6/operators/kafkaInputOperator/index.html
@@ -0,0 +1,695 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Kafka Input - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Kafka Input";
+    var mkdocs_page_input_path = "operators/kafkaInputOperator.md";
+    var mkdocs_page_url = "/operators/kafkaInputOperator/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Kafka Input</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#kafka-input-operator">KAFKA INPUT OPERATOR</a></li>
+                
+                    <li><a class="toctree-l4" href="#introduction">Introduction</a></li>
+                
+                    <li><a class="toctree-l4" href="#kafka-input-operator-for-kafka-08x">Kafka Input Operator for Kafka 0.8.x</a></li>
+                
+                    <li><a class="toctree-l4" href="#abstractkafkainputoperator">AbstractKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" href="#kafkaconsumer">KafkaConsumer</a></li>
+                
+                    <li><a class="toctree-l4" href="#pre-requisites">Pre-requisites</a></li>
+                
+                    <li><a class="toctree-l4" href="#offsetmanager">OffsetManager</a></li>
+                
+                    <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
+                
+                    <li><a class="toctree-l4" href="#abstractsingleportkafkainputoperator">AbstractSinglePortKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" href="#concrete-classes">Concrete Classes</a></li>
+                
+                    <li><a class="toctree-l4" href="#application-example">Application Example</a></li>
+                
+                    <li><a class="toctree-l4" href="#kafka-input-operator-for-kafka-09x">Kafka Input Operator for Kafka 0.9.x</a></li>
+                
+                    <li><a class="toctree-l4" href="#abstractkafkainputoperator_1">AbstractKafkaInputOperator</a></li>
+                
+                    <li><a class="toctree-l4" href="#concrete-classes_1">Concrete Classes</a></li>
+                
+                    <li><a class="toctree-l4" href="#application-example_1">Application Example</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">Csv Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../transform/">Transform Operator</a>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Kafka Input</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="kafka-input-operator">KAFKA INPUT OPERATOR</h1>
+<h3 id="introduction">Introduction</h3>
+<p><a href="http://kafka.apache.org">Apache Kafka</a> is a pull-based and distributed publish subscribe messaging system,
+topics are partitioned and replicated across nodes. </p>
+<p>The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. 
+The operator has the ability to automatically scale with the Kafka partitioning for high throughput. 
+It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.</p>
+<p>For more information about the operator design see this <a href="http://www.slideshare.net/ApacheApex/apache-apex-kafka-input-operator">presentation</a>
+and for processing guarantees this <a href="https://www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/">blog</a>.</p>
+<p>There are two separate implementations of the input operator,
+one built against Kafka 0.8 client and a newer version for the
+Kafka 0.9 consumer API that also works with MapR Streams.
+These reside in different packages and are described separately below.</p>
+<h3 id="kafka-input-operator-for-kafka-08x">Kafka Input Operator for Kafka 0.8.x</h3>
+<p>Package: <code>com.datatorrent.contrib.kafka</code></p>
+<p>Maven artifact: <a href="https://mvnrepository.com/artifact/org.apache.apex/malhar-contrib">malhar-contrib</a></p>
+<h3 id="abstractkafkainputoperator">AbstractKafkaInputOperator</h3>
+<p>This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn\u2019t have any ports.</p>
+<p><img alt="AbstractKafkaInput.png" src="../images/kafkainput/image00.png" /></p>
+<h4 id="configuration-parameters">Configuration Parameters</h4>
+<p><table>
+<col width="25%" />
+<col width="75%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>maxTuplesPerWindow</p></td>
+<td align="left"><p>Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. Default value = MAX_VALUE </p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>idempotentStorageManager</p></td>
+<td align="left"><p>This is an instance of IdempotentStorageManager. Idempotency ensures that the operator will process the same set of messages in a window before and after a failure. For example, let's say the operator completed window 10 and failed somewhere between window 11. If the operator gets restored at window 10 then it will process the same messages again in window 10 which it did in the previous run before the failure. Idempotency is important but comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default Value = com.datatorrent.lib.io.IdempotentStorageManager.<br>NoopIdempotentStorageManager</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>strategy</p></td>
+<td align="left"><p>Operator supports two types of partitioning strategies, ONE_TO_ONE and ONE_TO_MANY.</p>
+<p>ONE_TO_ONE: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.</p>
+<p>ONE_TO_MANY: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+Default Value = ONE_TO_ONE</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>msgRateUpperBound</p></td>
+<td align="left"><p>Maximum messages upper bound. Operator repartitions when the <em>msgProcessedPS</em> exceeds this bound. <em>msgProcessedPS</em> is the average number of messages processed per second by this operator.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>byteRateUpperBound</p></td>
+<td align="left"><p>Maximum bytes upper bound. Operator repartitions when the <em>bytesPS</em> exceeds this bound. <em>bytesPS</em> is the average number of bytes processed per second by this operator.</p>
+<p></p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>offsetManager</p></td>
+<td align="left"><p>This is an optional parameter that is useful when the application restarts or start at specific offsets (offsets are explained below)</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>repartitionInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. Default Value = 30 Seconds</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>repartitionCheckInterval</p></td>
+<td align="left"><p>Interval specified in milliseconds. This value specifies the minimum interval between two offset updates. Default Value = 5 Seconds</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>initialPartitionCount</p></td>
+<td align="left"><p>When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. Default Value = 1</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>consumer</p></td>
+<td align="left"><p>This is an instance of com.datatorrent.contrib.kafka.KafkaConsumer. Default Value = Instance of SimpleKafkaConsumer.</p></td>
+</tr>
+</tbody>
+</table></p>
+<h4 id="abstract-methods">Abstract Methods</h4>
+<p><code>void emitTuple(Message message)</code>: Abstract method that emits tuples extracted from Kafka message.</p>
+<h3 id="kafkaconsumer">KafkaConsumer</h3>
+<p>This is an abstract implementation of Kafka consumer. It sends the fetch
+requests to the leading brokers of Kafka partitions. For each request,
+it receives the set of messages and stores them into the buffer which is
+ArrayBlockingQueue. SimpleKafkaConsumer�which extends
+KafkaConsumer and serves the functionality of Simple Consumer API and
+HighLevelKafkaConsumer which extends KafkaConsumer and �serves the
+functionality of High Level Consumer API.</p>
+<h3 id="pre-requisites">Pre-requisites</h3>
+<p>This operator uses the Kafka 0.8.2.1 client consumer API
+and will work with 0.8.x and 0.7.x versions of Kafka broker.</p>
+<h4 id="configuration-parameters_1">Configuration Parameters</h4>
+<table>
+<col width="15%" />
+<col width="15%" />
+<col width="15%" />
+<col width="55%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>zookeeper</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p></p></td>
+<td align="left"><p>Specifies the zookeeper quorum of Kafka clusters that you want to consume messages from. zookeeper �is a string in the form of hostname1:port1,hostname2:port2,hostname3:port3 �where hostname1,hostname2,hostname3 are hosts and port1,port2,port3 are ports of zookeeper server. �If the topic name is the same across the Kafka clusters and want to consume data from these clusters, then configure the zookeeper as follows: c1::hs1:p1,hs2:p2,hs3:p3;c2::hs4:p4,hs5:p5,c3::hs6:p6</p>
+<p>where</p>
+<p>c1,c2,c3 indicates the cluster names, hs1,hs2,hs3,hs4,hs5,hs6 are zookeeper hosts and p1,p2,p3,p4,p5,p6 are corresponding ports. Here, cluster name is optional in case of single cluster</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>cacheSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1024</p></td>
+<td align="left"><p>Maximum of buffered messages hold in memory.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>topic</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>default_topic</p></td>
+<td align="left"><p>Indicates the name of the topic.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>initialOffset</p></td>
+<td align="left"><p>String</p></td>
+<td align="left"><p>latest</p></td>
+<td align="left"><p>Indicates the type of offset i.e, \u201cearliest or latest\u201d. If initialOffset is \u201clatest\u201d, then the operator consumes messages from latest point of Kafka queue. If initialOffset is \u201cearliest\u201d, then the operator consumes messages starting from message queue. This can be overridden by OffsetManager.</p></td>
+</tr>
+</tbody>
+</table>
+
+<h4 id="abstract-methods_1">Abstract Methods</h4>
+<ol>
+<li>void commitOffset(): Commit the offsets at checkpoint.</li>
+<li>Map &lt;KafkaPartition, Long&gt; getCurrentOffsets(): Return the current
+    offset status.</li>
+<li>resetPartitionsAndOffset(Set &lt;KafkaPartition&gt; partitionIds,
+    Map &lt;KafkaPartition, Long&gt; startOffset): Reset the partitions with
+    parittionIds and offsets with startOffset.</li>
+</ol>
+<h4 id="configuration-parameters-for-simplekafkaconsumer">Configuration Parameters�for SimpleKafkaConsumer</h4>
+<table>
+<col width="25%" />
+<col width="15%" />
+<col width="15%" />
+<col width="45%" />
+<tbody>
+<tr class="odd">
+<td align="left"><p>Parameter</p></td>
+<td align="left"><p>Type</p></td>
+<td align="left"><p>Default</p></td>
+<td align="left"><p>Description</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>bufferSize</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>1 MB</p></td>
+<td align="left"><p>Specifies the maximum total size of messages for each fetch request.</p></td>
+</tr>
+<tr class="odd">
+<td align="left"><p>metadataRefreshInterval</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>30 Seconds</p></td>
+<td align="left"><p>Interval in between refresh the metadata change(broker change) in milliseconds. Enabling metadata refresh guarantees an automatic reconnect when a new broker is elected as the host. A value of -1 disables this feature.</p></td>
+</tr>
+<tr class="even">
+<td align="left"><p>metadataRefreshRetryLimit</p></td>
+<td align="left"><p>int</p></td>
+<td align="left"><p>-1</p></td>
+<td align="left"><p>Specifies the maximum brokers' metadata refresh retry limit. -1 means unlimited retry.</p></td>
+</tr>
+</tbody>
+</table>
+
+<h3 id="offsetmanager">OffsetManager</h3>
+<p>This is an interface for offset management and is useful when consuming data
+from specified offsets. Updates the offsets for all the Kafka partitions
+periodically. Below is the code snippet:��������</p>
+<pre><code class="java">public interface OffsetManager
+{
+  public Map&lt;KafkaPartition, Long&gt; loadInitialOffsets();
+  public void updateOffsets(Map&lt;KafkaPartition, Long&gt; offsetsOfPartitions);
+}
+</code></pre>
+
+<h4 id="abstract-methods_2">Abstract Methods</h4>
+<p><code>Map &lt;KafkaPartition, Long&gt; loadInitialOffsets()</code>: Specifies the initial offset for consuming messages; called at the activation stage.</p>
+<p><code>updateOffsets(Map&lt;KafkaPartition, Long&gt; offsetsOfPartitions)</code>: �This
+method is called at every repartitionCheckInterval to update offsets.</p>
+<h3 id="partitioning">Partitioning</h3>
+<p>The logical instance of the KafkaInputOperator acts as the Partitioner
+as well as a StatsListener. This is because the
+AbstractKafkaInputOperator implements both the
+com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener
+interfaces and provides an implementation of definePartitions(...) and
+processStats(...) which makes it auto-scalable.</p>
+<h4 id="response-processstatsbatchedoperatorstats-stats">Response processStats(BatchedOperatorStats stats)</h4>
+<p>The application master invokes this method on the logical instance with
+the stats (tuplesProcessedPS, bytesPS, etc.) of each partition.
+Re-partitioning happens based on whether any new Kafka partitions added for
+the topic or bytesPS and msgPS cross their respective upper bounds.</p>
+<h4 id="definepartitions">DefinePartitions</h4>
+<p>Based on the repartitionRequired field of the Response object which is
+returned by processStats(...) method, the application master invokes
+definePartitions(...) on the logical instance which is also the
+partitioner instance. Dynamic partition can be disabled by setting the
+parameter repartitionInterval value to a negative value.</p>
+<h3 id="abstractsingleportkafkainputoperator">AbstractSinglePortKafkaInputOperator</h3>
+<p>This class extends AbstractKafkaInputOperator to emit messages through single output port.</p>
+<h4 id="ports">Ports</h4>
+<p><code>outputPort &lt;T&gt;</code>: Tuples extracted from Kafka messages are emitted through this port.</p>
+<h4 id="abstract-methods_3">Abstract Methods</h4>
+<p><code>T getTuple(Message msg)</code>: Converts the Kafka message to tuple.</p>
+<h3 id="concrete-classes">Concrete Classes</h3>
+<ol>
+<li>KafkaSinglePortStringInputOperator: extends <code>AbstractSinglePortKafkaInputOperator</code>, extracts string from Kafka message.</li>
+<li>KafkaSinglePortByteArrayInputOperator: extends <code>AbstractSinglePortKafkaInputOperator</code>, extracts byte array from Kafka message.</li>
+</ol>
+<h3 id="application-example">Application Example</h3>
+<p>This section builds an Apex application using Kafka input operator.
+Below is the code snippet:</p>
+<pre><code class="java">@ApplicationAnnotation(name = &quot;KafkaApp&quot;)
+public class ExampleKafkaApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortByteArrayInputOperator input =  dag.addOperator(&quot;MessageReader&quot;, new KafkaSinglePortByteArrayInputOperator());
+    ConsoleOutputOperator output = dag.addOperator(&quot;Output&quot;, new ConsoleOutputOperator());
+    dag.addStream(&quot;MessageData&quot;, input.outputPort, output.input);
+  }
+}
+</code></pre>
+
+<p>Below is the configuration for \u201ctest\u201d Kafka topic name and
+\u201clocalhost:2181\u201d is the zookeeper forum:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topic&lt;/name&gt;
+  &lt;value&gt;test&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.zookeeper&lt;/nam&gt;
+  &lt;value&gt;localhost:2181&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<h3 id="kafka-input-operator-for-kafka-09x">Kafka Input Operator for Kafka 0.9.x</h3>
+<p>Package: <code>org.apache.apex.malhar.kafka</code></p>
+<p>Maven Artifact: <a href="https://mvnrepository.com/artifact/org.apache.apex/malhar-kafka">malhar-kafka</a></p>
+<p>This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later.
+The operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.</p>
+<h4 id="pre-requisites_1">Pre-requisites</h4>
+<p>This operator requires version 0.9.0 or later of the Kafka Consumer API.</p>
+<h3 id="abstractkafkainputoperator_1">AbstractKafkaInputOperator</h3>
+<h4 id="ports_1">Ports</h4>
+<hr />
+<p>This abstract class doesn't have any ports.</p>
+<h4 id="configuration-properties">Configuration properties</h4>
+<hr />
+<ul>
+<li>
+<p><strong><em>clusters</em></strong> - String[]</p>
+<ul>
+<li>Mandatory Parameter.</li>
+<li>Specifies the Kafka clusters that you want to consume messages from. To configure multi-cluster support, you need to specify the clusters separated by ";".</li>
+</ul>
+</li>
+<li>
+<p><strong><em>topics</em></strong> - String[]</p>
+<ul>
+<li>Mandatory Parameter.</li>
+<li>Specified the Kafka topics that you want to consume messages from. If you want multi-topic support, then specify the topics separated by ",".</li>
+</ul>
+</li>
+<li>
+<p><strong><em>strategy</em></strong> - PartitionStrategy</p>
+<ul>
+<li>
+<p>Operator supports two types of partitioning strategies, <code>ONE_TO_ONE</code> and <code>ONE_TO_MANY</code>.</p>
+<p><code>ONE_TO_ONE</code>: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances.
+<code>ONE_TO_MANY</code>: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances.
+Default Value = <code>PartitionStrategy.ONE_TO_ONE</code>.</p>
+</li>
+</ul>
+</li>
+<li>
+<p><strong><em>initialPartitionCount</em></strong> - Integer</p>
+<ul>
+<li>When the ONE_TO_MANY partition strategy is enabled, this value indicates the number of Kafka input operator instances. 
+    Default Value = 1.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>repartitionInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum time required between two repartition actions. 
+    Default Value = 30 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>repartitionCheckInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum interval between two stat checks.
+    Default Value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>maxTuplesPerWindow</em></strong> - Integer</p>
+<ul>
+<li>Controls the maximum number of messages emitted in each streaming window from this operator. Minimum value is 1. 
+    Default value = <code>MAX_VALUE</code> </li>
+</ul>
+</li>
+<li>
+<p><strong><em>initialOffset</em></strong> - InitialOffset</p>
+<ul>
+<li>Indicates the type of offset i.e, <code>EARLIEST</code> or <code>LATEST</code> or <code>APPLICATION_OR_EARLIEST</code> or <code>APPLICATION_OR_LATEST</code>. 
+    <code>LATEST</code> =&gt; Consume new messages from latest offset in the topic. 
+    <code>EARLIEST</code> =&gt; Consume all messages available in the topic.
+    <code>APPLICATION_OR_EARLIEST</code> =&gt; Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning.
+    <code>APPLICATION_OR_LATEST</code> =&gt; Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position.
+    Default value = <code>InitialOffset.APPLICATION_OR_LATEST</code></li>
+</ul>
+</li>
+<li>
+<p><strong><em>metricsRefreshInterval</em></strong> - Long</p>
+<ul>
+<li>Interval specified in milliseconds. This value specifies the minimum interval between two metric stat updates.
+    Default value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>consumerTimeout</em></strong> - Long</p>
+<ul>
+<li>Indicates the <a href="http://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#poll">time waiting in poll</a> when data is not available.
+    Default value = 5 Seconds.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>holdingBufferSize</em></strong> - Long</p>
+<ul>
+<li>Indicates the maximum number of messages kept in memory for emitting.
+    Default value = 1024.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>consumerProps</em></strong> - Properties</p>
+<ul>
+<li>Specify the [consumer properties[(http://kafka.apache.org/090/documentation.html#newconsumerconfigs) which are not yet set to the operator.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>windowDataManager</em></strong> - WindowDataManager</p>
+<ul>
+<li>If set to a value other than the default, such as <code>FSWindowDataManager</code>, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window.
+    Default value = <code>WindowDataManager.NoopWindowDataManager</code>.</li>
+</ul>
+</li>
+</ul>
+<h4 id="abstract-methods_4">Abstract Methods</h4>
+<p><code>void emitTuple(String cluster, ConsumerRecord&lt;byte[], byte[]&gt; message)</code>: Abstract method that emits tuples
+extracted from Kafka message.</p>
+<h3 id="concrete-classes_1">Concrete Classes</h3>
+<h4 id="kafkasingleportinputoperator">KafkaSinglePortInputOperator</h4>
+<p>This class extends from AbstractKafkaInputOperator and defines the <code>getTuple()</code> method which extracts byte array from Kafka message.</p>
+<h4 id="ports_2">Ports</h4>
+<p><code>outputPort &lt;byte[]&gt;</code>: Tuples extracted from Kafka messages are emitted through this port.</p>
+<h3 id="application-example_1">Application Example</h3>
+<p>This section builds an Apex application using Kafka input operator.
+Below is the code snippet:</p>
+<pre><code class="java">@ApplicationAnnotation(name = &quot;KafkaApp&quot;)
+public class ExampleKafkaApplication implements StreamingApplication
+{
+  @Override
+  public void populateDAG(DAG dag, Configuration entries)
+  {
+    KafkaSinglePortInputOperator input =  dag.addOperator(&quot;MessageReader&quot;, new KafkaSinglePortInputOperator());
+    ConsoleOutputOperator output = dag.addOperator(&quot;Output&quot;, new ConsoleOutputOperator());
+    dag.addStream(&quot;MessageData&quot;, input.outputPort, output.input);
+  }
+}
+</code></pre>
+
+<p>Below is the configuration for \u201ctest\u201d Kafka topic name and
+\u201clocalhost:9092\u201d is the Broker:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topics&lt;/name&gt;
+  &lt;value&gt;test&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.clusters&lt;/nam&gt;
+  &lt;value&gt;localhost:9092&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>By adding following lines to properties file, Kafka Input Operator supports multi-topic and multi-cluster:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.MessageReader.prop.topics&lt;/name&gt;
+  &lt;value&gt;test1, test2&lt;/value&gt;
+&lt;/property&gt;
+
+&lt;property&gt;
+  &lt;name&gt;dt.operator.KafkaInputOperator.prop.clusters&lt;/nam&gt;
+  &lt;value&gt;localhost:9092; localhost:9093; localhost:9094&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<p>For a full example application project, refer to https://github.com/DataTorrent/examples/tree/master/tutorials/kafka</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+        <a href="../jmsInputOperator/" class="btn btn-neutral float-right" title="JMS Input">Next <span class="icon icon-circle-arrow-right"></span></a>
+      
+      
+        <a href="../.." class="btn btn-neutral" title="Apache Apex Malhar"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../.." style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+        <span style="margin-left: 15px"><a href="../jmsInputOperator/" style="color: #fcfcfc">Next &raquo;</a></span>
+      
+    </span>
+</div>
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/apex-site/blob/357a5a07/docs/malhar-3.6/operators/transform/index.html
----------------------------------------------------------------------
diff --git a/docs/malhar-3.6/operators/transform/index.html b/docs/malhar-3.6/operators/transform/index.html
new file mode 100644
index 0000000..7e39a2c
--- /dev/null
+++ b/docs/malhar-3.6/operators/transform/index.html
@@ -0,0 +1,516 @@
+<!DOCTYPE html>
+<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
+<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
+<head>
+  <meta charset="utf-8">
+  <meta http-equiv="X-UA-Compatible" content="IE=edge">
+  <meta name="viewport" content="width=device-width, initial-scale=1.0">
+  
+  
+  
+  <title>Transform Operator - Apache Apex Malhar Documentation</title>
+  
+
+  <link rel="shortcut icon" href="../../favicon.ico">
+  
+
+  
+  <link href='https://fonts.googleapis.com/css?family=Lato:400,700|Roboto+Slab:400,700|Inconsolata:400,700' rel='stylesheet' type='text/css'>
+
+  <link rel="stylesheet" href="../../css/theme.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/theme_extra.css" type="text/css" />
+  <link rel="stylesheet" href="../../css/highlight.css">
+
+  
+  <script>
+    // Current page data
+    var mkdocs_page_name = "Transform Operator";
+    var mkdocs_page_input_path = "operators/transform.md";
+    var mkdocs_page_url = "/operators/transform/";
+  </script>
+  
+  <script src="../../js/jquery-2.1.1.min.js"></script>
+  <script src="../../js/modernizr-2.8.3.min.js"></script>
+  <script type="text/javascript" src="../../js/highlight.pack.js"></script>
+  <script src="../../js/theme.js"></script> 
+
+  
+</head>
+
+<body class="wy-body-for-nav" role="document">
+
+  <div class="wy-grid-for-nav">
+
+    
+    <nav data-toggle="wy-nav-shift" class="wy-nav-side stickynav">
+      <div class="wy-side-nav-search">
+        <a href="../.." class="icon icon-home"> Apache Apex Malhar Documentation</a>
+        <div role="search">
+  <form id ="rtd-search-form" class="wy-form" action="../../search.html" method="get">
+    <input type="text" name="q" placeholder="Search docs" />
+  </form>
+</div>
+      </div>
+
+      <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
+        <ul class="current">
+          
+            <li>
+    <li class="toctree-l1 ">
+        <a class="" href="../..">Apache Apex Malhar</a>
+        
+    </li>
+<li>
+          
+            <li>
+    <ul class="subnav">
+    <li><span>Operators</span></li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../kafkaInputOperator/">Kafka Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jmsInputOperator/">JMS Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_splitter/">File Splitter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../block_reader/">Block Reader</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../fsInputOperator/">File Input</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../csvParserOperator/">Csv Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../file_output/">File Output</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../enricher/">Enricher</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../filter/">Filter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../deduper/">Deduper</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../windowedOperator/">Windowed Operator</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonParser/">Json Parser</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 ">
+        <a class="" href="../jsonFormatter/">Json Formatter</a>
+        
+    </li>
+
+        
+            
+    <li class="toctree-l1 current">
+        <a class="current" href="./">Transform Operator</a>
+        
+            <ul>
+            
+                <li class="toctree-l3"><a href="#transform-operator-documentation">Transform - Operator Documentation</a></li>
+                
+                    <li><a class="toctree-l4" href="#about-transform-operator">About Transform operator</a></li>
+                
+                    <li><a class="toctree-l4" href="#use-case">Use Case</a></li>
+                
+                    <li><a class="toctree-l4" href="#configuration-parameters">Configuration Parameters</a></li>
+                
+                    <li><a class="toctree-l4" href="#configuration-example">Configuration Example</a></li>
+                
+                    <li><a class="toctree-l4" href="#ports">Ports</a></li>
+                
+                    <li><a class="toctree-l4" href="#attributes">Attributes</a></li>
+                
+                    <li><a class="toctree-l4" href="#application-example">Application Example</a></li>
+                
+                    <li><a class="toctree-l4" href="#partitioning">Partitioning</a></li>
+                
+            
+            </ul>
+        
+    </li>
+
+        
+    </ul>
+<li>
+          
+        </ul>
+      </div>
+      &nbsp;
+    </nav>
+
+    <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
+
+      
+      <nav class="wy-nav-top" role="navigation" aria-label="top navigation">
+        <i data-toggle="wy-nav-top" class="fa fa-bars"></i>
+        <a href="../..">Apache Apex Malhar Documentation</a>
+      </nav>
+
+      
+      <div class="wy-nav-content">
+        <div class="rst-content">
+          <div role="navigation" aria-label="breadcrumbs navigation">
+  <ul class="wy-breadcrumbs">
+    <li><a href="../..">Docs</a> &raquo;</li>
+    
+      
+        
+          <li>Operators &raquo;</li>
+        
+      
+    
+    <li>Transform Operator</li>
+    <li class="wy-breadcrumbs-aside">
+      
+    </li>
+  </ul>
+  <hr/>
+</div>
+          <div role="main">
+            <div class="section">
+              
+                <h1 id="transform-operator-documentation">Transform - Operator Documentation</h1>
+<h3 id="about-transform-operator">About Transform operator</h3>
+<hr />
+<p>Transform means mapping of field expression from input to output or conversion of fields from one type to another.
+This operator is stateless. This operator receives objects on its input port; for each such input object, it creates a new output object whose fields are computed as expressions involving fields of the input object. 
+The types of the input and output objects are configurable as are the expressions used to compute the output fields. </p>
+<p>The operator class is <code>TransformOperator</code> located in the package <code>com.datatorrent.lib.transform</code>.
+Please refer to <a href="https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java">github URL</a> for <code>TransformOperator</code>.</p>
+<h3 id="use-case">Use Case</h3>
+<hr />
+<p>Consider the data that needs to be transformed as per output schema.</p>
+<p>Consider input objects with these fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>FirstName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>LastName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Phone</td>
+<td>String</td>
+</tr>
+<tr>
+<td>DateOfBirth</td>
+<td>java.util.Date</td>
+</tr>
+<tr>
+<td>Address</td>
+<td>String</td>
+</tr>
+</tbody>
+</table>
+<p>and output objects with fields: </p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>Name</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Phone</td>
+<td>String</td>
+</tr>
+<tr>
+<td>Age</td>
+<td>Integer</td>
+</tr>
+<tr>
+<td>Address</td>
+<td>String</td>
+</tr>
+</tbody>
+</table>
+<p>Suppose <code>Name</code> is a concatenation of <code>FirstName</code> and <code>LastName</code> and 
+        <code>Age</code> is computed by subtracting the <code>DateOfBirth</code> from the current year.</p>
+<p>These simple computations can be expressed as Java expressions where the input object is
+represented by $ and provided as configuration parameters as follows:</p>
+<pre><code>Name =&gt; {$.FirstName}.concat(\&quot; \&quot;).concat({$.LastName})
+Age =&gt; (new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()
+</code></pre>
+
+<h3 id="configuration-parameters">Configuration Parameters</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>expressionMap</em></strong> -   Map<String, String></p>
+<ul>
+<li>Mandatory Parameter</li>
+<li>Specifies the map between the output field (key) and the expression used to compute it (value) using fields of the input Java object.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>expressionFunctions</em></strong> -   List<String></p>
+<ul>
+<li>List of imported classes or methods should be made available to expression to use. It overrides the default list.</li>
+<li>Default Value = {java.lang.Math.<em>, org.apache.commons.lang3.StringUtils.</em>, org.apache.commons.lang3.StringEscapeUtils.<em>, org.apache.commons.lang3.time.DurationFormatUtils.</em>, org.apache.commons.lang3.time.DateFormatUtils.*}</li>
+</ul>
+</li>
+<li>
+<p><strong><em>copyMatchingFields</em></strong> -   boolean</p>
+<ul>
+<li>Specifies whether matching fields should be copied; here matching means the name and type of an input field is the same as the name and type of an output field. 
+    If the matching field appears in <code>expressionMap</code> then it ignores copy to output object.</li>
+<li>Default Value = true.</li>
+</ul>
+</li>
+</ul>
+<h3 id="configuration-example">Configuration Example</h3>
+<hr />
+<p>Consider input object with fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>FirstName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>LastName</td>
+<td>String</td>
+</tr>
+<tr>
+<td>StartDate</td>
+<td>org.joda.time.DateTime</td>
+</tr>
+</tbody>
+</table>
+<p>and output objects with fields:</p>
+<table>
+<thead>
+<tr>
+<th>Name</th>
+<th>Type</th>
+</tr>
+</thead>
+<tbody>
+<tr>
+<td>Name</td>
+<td>String</td>
+</tr>
+<tr>
+<td>isLeapYear</td>
+<td>Boolean</td>
+</tr>
+</tbody>
+</table>
+<p>Note: <code>org.joda.time.DateTime</code> class is not present in the default list. So, we need to add this library to <code>expressionFunctions</code> as below in populateDAG method:</p>
+<pre><code class="java">TransformOperator operator = dag.addOperator(&quot;transform&quot;, new TransformOperator());
+operator.setExpressionFunctions(Arrays.asList(&quot;org.joda.time.DateTime&quot;, org.apache.commons.lang3.StringUtils));
+Map&lt;String,String&gt; expressionMap = new HashMap&lt;&gt;();
+expressionMap.put(isLeapYear, {$.StartDate}.year().isLeap());
+expressionMap.put(Name, org.apache.commons.lang3.StringUtils.joinWith(\&quot; \&quot;, {$.FirstName},{$.LastName});
+operator.setExpressionMap(expressionMap);
+</code></pre>
+
+<p>Above Properties also can be set in properties file as follows:</p>
+<pre><code class="xml">&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionFunctions[0]&lt;/name&gt;
+  &lt;value&gt;org.joda.time.DateTime&lt;/value&gt;
+&lt;/property&gt;     
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionFunctions[1]&lt;/name&gt;
+  &lt;value&gt;org.apache.commons.lang3.StringUtils&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionMap(isLeapYear)&lt;/name&gt;
+  &lt;value&gt;{$.StartDate}.year().isLeap()&lt;/value&gt;
+&lt;/property&gt;
+&lt;property&gt;
+  &lt;name&gt;dt.operator.transform.expressionMap(Name)&lt;/name&gt;
+  &lt;value&gt;org.apache.commons.lang3.StringUtils.joinWith(\&quot; \&quot;, {$.FirstName}, {$.LastName})&lt;/value&gt;
+&lt;/property&gt;
+</code></pre>
+
+<h3 id="ports">Ports</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>input</em></strong> -   Port for input tuples.</p>
+<ul>
+<li>Mandatory input port</li>
+</ul>
+</li>
+<li>
+<p><strong><em>output</em></strong>    -   Port for transformed output tuples.</p>
+<ul>
+<li>Mandatory output port</li>
+</ul>
+</li>
+</ul>
+<h3 id="attributes">Attributes</h3>
+<hr />
+<ul>
+<li>
+<p><strong><em>Input port Attribute - input.TUPLE_CLASS</em></strong>�- Fully qualified class name and class should be Kryo serializable.</p>
+<ul>
+<li>Mandatory attribute</li>
+<li>Type of input tuple.</li>
+</ul>
+</li>
+<li>
+<p><strong><em>Output port Attribute - output.TUPLE_CLASS</em></strong>�- Fully qualified class name and class should be Kryo serializable.</p>
+<ul>
+<li>Mandatory attribute</li>
+<li>Type of output tuple.</li>
+</ul>
+</li>
+</ul>
+<h3 id="application-example">Application Example</h3>
+<hr />
+<p>Please refer <a href="https://github.com/DataTorrent/examples/tree/master/tutorials/transform">Example</a> for transform sample application.</p>
+<h3 id="partitioning">Partitioning</h3>
+<hr />
+<p>Being stateless, this operator can be partitioned using any of the built-in partitioners present in the Malhar library by setting a few properties as follows:</p>
+<h4 id="stateless-partitioning">Stateless partitioning</h4>
+<p>Stateless partitioning will ensure that TransformOperator will be partitioned right at the starting of the application and will remain partitioned throughout the lifetime of the DAG.
+TransformOperator can be stateless partitioned by adding following lines to properties.xml:</p>
+<pre><code class="xml">  &lt;property&gt;
+    &lt;name&gt;dt.operator.{OperatorName}.attr.PARTITIONER&lt;/name&gt;
+    &lt;value&gt;com.datatorrent.common.partitioner.StatelessPartitioner:{N}/value&gt;
+  &lt;/property&gt;
+</code></pre>
+
+<p>where {OperatorName} is the name of the TransformOperator operator and
+      {N} is the number of static partitions.
+Above lines will partition TransformOperator statically {N} times. </p>
+<h4 id="dynamic-partitioning">Dynamic Partitioning</h4>
+<p>Dynamic partitioning is a feature of Apex platform which changes the partition of the operator based on certain condition.
+TransformOperator can be dynamically partitioned using the below two partitioners:</p>
+<h5 id="throughput-based">Throughput based</h5>
+<p>Following code can be added to populateDAG(DAG dag, Configuration conf) method of application to dynamically partitioning TransformOperator:</p>
+<pre><code class="java">StatelessThroughputBasedPartitioner&lt;TransformOperator&gt; partitioner = new StatelessThroughputBasedPartitioner&lt;&gt;();
+partitioner.setCooldownMillis(10000);
+partitioner.setMaximumEvents(30000);
+partitioner.setMinimumEvents(10000);
+dag.setAttribute(transform, OperatorContext.STATS_LISTENERS, Arrays.asList(new StatsListener[]{partitioner}));
+dag.setAttribute(transform, OperatorContext.PARTITIONER, partitioner);
+</code></pre>
+
+<p>Above code will dynamically partition TransformOperator when the throughput changes.
+If the overall throughput of TransformOperator goes beyond 30000 or less than 10000, the platform will repartition TransformOperator 
+to balance throughput of a single partition to be between 10000 and 30000.
+CooldownMillis of 10000 will be used as the threshold time for which the throughout change is observed.</p>
+<p>Source code for this dynamic application can be found <a href="https://github.com/DataTorrent/examples/blob/master/tutorials/transform/src/main/java/com/example/transform/DynamicTransformApplication.java">here</a>.</p>
+              
+            </div>
+          </div>
+          <footer>
+  
+    <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation">
+      
+      
+        <a href="../jsonFormatter/" class="btn btn-neutral" title="Json Formatter"><span class="icon icon-circle-arrow-left"></span> Previous</a>
+      
+    </div>
+  
+
+  <hr/>
+
+  <div role="contentinfo">
+    <!-- Copyright etc -->
+    
+  </div>
+
+  Built with <a href="http://www.mkdocs.org">MkDocs</a> using a <a href="https://github.com/snide/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
+</footer>
+	  
+        </div>
+      </div>
+
+    </section>
+
+  </div>
+
+<div class="rst-versions" role="note" style="cursor: pointer">
+    <span class="rst-current-version" data-toggle="rst-current-version">
+      
+      
+        <span><a href="../jsonFormatter/" style="color: #fcfcfc;">&laquo; Previous</a></span>
+      
+      
+    </span>
+</div>
+
+</body>
+</html>