You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/29 05:20:14 UTC

[13/15] storm git commit: add external module docs; misc tweaks

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/_site/documentation/storm-hive.html
----------------------------------------------------------------------
diff --git a/_site/documentation/storm-hive.html b/_site/documentation/storm-hive.html
new file mode 100644
index 0000000..0a2dbac
--- /dev/null
+++ b/_site/documentation/storm-hive.html
@@ -0,0 +1,328 @@
+<!DOCTYPE html>
+<html>
+    <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <title>Storm Hive Integration</title>
+
+    <!-- Bootstrap core CSS -->
+    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
+    <!-- Bootstrap theme -->
+    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
+
+    <!-- Custom styles for this template -->
+    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
+    <link href="/css/style.css" rel="stylesheet">
+    <link href="/assets/css/owl.theme.css" rel="stylesheet">
+    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
+    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
+    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
+    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
+    <script type="text/javascript" src="/assets/js/storm.js"></script>
+    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
+    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
+    
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+
+
+  <body>
+    <header>
+  <div class="container-fluid">
+      <div class="row">
+          <div class="col-md-10">
+              <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
+            </div>
+            <div class="col-md-2">
+              <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
+            </div>
+        </div>
+    </div>
+</header>
+<!--Header End-->
+<!--Navigation Begin-->
+<div class="navbar" role="banner">
+  <div class="container-fluid">
+      <div class="navbar-header">
+          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+            </button>
+        </div>
+        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
+          <ul class="nav navbar-nav">
+              <li><a href="/index.html" id="home">Home</a></li>
+                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
+                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
+                <li><a href="/documentation.html" id="documentation">Documentation</a></li>
+                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
+                <li class="dropdown">
+                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
+                    <ul class="dropdown-menu">
+                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
+                        <li><a href="/contribute/People.html">People</a></li>
+                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
+                    </ul>
+                </li>
+                <li><a href="/2015/06/15/storm0100-beta-released.html" id="news">News</a></li>
+            </ul>
+        </nav>
+    </div>
+</div>
+
+
+
+    <div class="container-fluid">
+    <h1 class="page-title">Storm Hive Integration</h1>
+          <div class="row">
+           	<div class="col-md-12">
+	             <!-- Documentation -->
+
+<p class="post-meta"></p>
+
+<p>Hive offers streaming API that allows data to be written continuously into Hive. The incoming data 
+  can be continuously committed in small batches of records into existing Hive partition or table. Once the data
+  is committed its immediately visible to all hive queries. More info on Hive Streaming API 
+  <a href="https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest">https://cwiki.apache.org/confluence/display/Hive/Streaming+Data+Ingest</a></p>
+
+<p>With the help of Hive Streaming API, HiveBolt and HiveState allows users to stream data from Storm into Hive directly.
+  To use Hive streaming API users need to create a bucketed table with ORC format.  Example below</p>
+<div class="highlight"><pre><code class="language-sql" data-lang="sql">  <span class="k">create</span> <span class="k">table</span> <span class="n">test_table</span> <span class="p">(</span> <span class="n">id</span> <span class="nb">INT</span><span class="p">,</span> <span class="n">name</span> <span class="n">STRING</span><span class="p">,</span> <span class="n">phone</span> <span class="n">STRING</span><span class="p">,</span> <span class="n">street</span> <span class="n">STRING</span><span class="p">)</span> <span class="n">partitioned</span> <span class="k">by</span> <span class="p">(</span><span class="n">city</span> <span class="n">STRING</span><span class="p">,</span> <span class="k">state</span> <span class="n">STRING</span><span class="p">)</span> <span class="n">stored</span> <span class="k">as</span> <span class="n">orc</span> <span class="n">tblproperties</span> <span class="p">(</span><span class="ss">&quot;orc.compress&quot;</span><span class="o">=</span><span class="
 ss">&quot;NONE&quot;</span><span class="p">);</span>
+</code></pre></div>
+<h2 id="hivebolt-(org.apache.storm.hive.bolt.hivebolt)">HiveBolt (org.apache.storm.hive.bolt.HiveBolt)</h2>
+
+<p>HiveBolt streams tuples directly into Hive. Tuples are written using Hive Transactions. 
+Partitions to which HiveBolt will stream to can either created or pre-created or optionally
+HiveBolt can create them if they are missing. Fields from Tuples are mapped to table columns.
+User should make sure that Tuple field names are matched to the table column names.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">DelimitedRecordHiveMapper</span> <span class="n">mapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">DelimitedRecordHiveMapper</span><span class="o">()</span>
+            <span class="o">.</span><span class="na">withColumnFields</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="n">colNames</span><span class="o">));</span>
+<span class="n">HiveOptions</span> <span class="n">hiveOptions</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HiveOptions</span><span class="o">(</span><span class="n">metaStoreURI</span><span class="o">,</span><span class="n">dbName</span><span class="o">,</span><span class="n">tblName</span><span class="o">,</span><span class="n">mapper</span><span class="o">);</span>
+<span class="n">HiveBolt</span> <span class="n">hiveBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HiveBolt</span><span class="o">(</span><span class="n">hiveOptions</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="recordhivemapper">RecordHiveMapper</h3>
+
+<p>This class maps Tuple field names to Hive table column names.
+   There are two implementaitons available</p>
+
+<ul>
+<li>DelimitedRecordHiveMapper (org.apache.storm.hive.bolt.mapper.DelimitedRecordHiveMapper)</li>
+<li>JsonRecordHiveMapper (org.apache.storm.hive.bolt.mapper.JsonRecordHiveMapper)</li>
+</ul>
+<div class="highlight"><pre><code class="language-java" data-lang="java">   <span class="n">DelimitedRecordHiveMapper</span> <span class="n">mapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">DelimitedRecordHiveMapper</span><span class="o">()</span>
+            <span class="o">.</span><span class="na">withColumnFields</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="n">colNames</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withPartitionFields</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="n">partNames</span><span class="o">));</span>
+    <span class="n">or</span>
+   <span class="n">DelimitedRecordHiveMapper</span> <span class="n">mapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">DelimitedRecordHiveMapper</span><span class="o">()</span>
+            <span class="o">.</span><span class="na">withColumnFields</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="n">colNames</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withTimeAsPartitionField</span><span class="o">(</span><span class="s">&quot;YYYY/MM/DD&quot;</span><span class="o">);</span>
+</code></pre></div>
+<table><thead>
+<tr>
+<th>Arg</th>
+<th>Description</th>
+<th>Type</th>
+</tr>
+</thead><tbody>
+<tr>
+<td>withColumnFields</td>
+<td>field names in a tuple to be mapped to table column names</td>
+<td>Fields (required)</td>
+</tr>
+<tr>
+<td>withPartitionFields</td>
+<td>field names in a tuple can be mapped to hive table partitions</td>
+<td>Fields</td>
+</tr>
+<tr>
+<td>withTimeAsPartitionField</td>
+<td>users can select system time as partition in hive table</td>
+<td>String . Date format</td>
+</tr>
+</tbody></table>
+
+<h3 id="hiveoptions-(org.apache.storm.hive.common.hiveoptions)">HiveOptions (org.apache.storm.hive.common.HiveOptions)</h3>
+
+<p>HiveBolt takes in HiveOptions as a constructor arg.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">  <span class="n">HiveOptions</span> <span class="n">hiveOptions</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HiveOptions</span><span class="o">(</span><span class="n">metaStoreURI</span><span class="o">,</span><span class="n">dbName</span><span class="o">,</span><span class="n">tblName</span><span class="o">,</span><span class="n">mapper</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withTxnsPerBatch</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withBatchSize</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withIdleTimeout</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
+</code></pre></div>
+<p>HiveOptions params</p>
+
+<table><thead>
+<tr>
+<th>Arg</th>
+<th>Description</th>
+<th>Type</th>
+</tr>
+</thead><tbody>
+<tr>
+<td>metaStoreURI</td>
+<td>hive meta store URI (can be found in hive-site.xml)</td>
+<td>String (required)</td>
+</tr>
+<tr>
+<td>dbName</td>
+<td>database name</td>
+<td>String (required)</td>
+</tr>
+<tr>
+<td>tblName</td>
+<td>table name</td>
+<td>String (required)</td>
+</tr>
+<tr>
+<td>mapper</td>
+<td>Mapper class to map Tuple field names to Table column names</td>
+<td>DelimitedRecordHiveMapper or JsonRecordHiveMapper (required)</td>
+</tr>
+<tr>
+<td>withTxnsPerBatch</td>
+<td>Hive grants a <em>batch of transactions</em> instead of single transactions to streaming clients like HiveBolt.This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.</td>
+<td>Integer . default 100</td>
+</tr>
+<tr>
+<td>withMaxOpenConnections</td>
+<td>Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.</td>
+<td>Integer . default 100</td>
+</tr>
+<tr>
+<td>withBatchSize</td>
+<td>Max number of events written to Hive in a single Hive transaction</td>
+<td>Integer. default 15000</td>
+</tr>
+<tr>
+<td>withCallTimeout</td>
+<td>(In milliseconds) Timeout for Hive &amp; HDFS I/O operations, such as openTxn, write, commit, abort.</td>
+<td>Integer. default 10000</td>
+</tr>
+<tr>
+<td>withHeartBeatInterval</td>
+<td>(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.</td>
+<td>Integer. default 240</td>
+</tr>
+<tr>
+<td>withAutoCreatePartitions</td>
+<td>HiveBolt will automatically create the necessary Hive partitions to stream to.</td>
+<td>Boolean. default true</td>
+</tr>
+<tr>
+<td>withKerberosPrinicipal</td>
+<td>Kerberos user principal for accessing secure Hive</td>
+<td>String</td>
+</tr>
+<tr>
+<td>withKerberosKeytab</td>
+<td>Kerberos keytab for accessing secure Hive</td>
+<td>String</td>
+</tr>
+<tr>
+<td>withTickTupleInterval</td>
+<td>(In seconds) If &gt; 0 then the Hive Bolt will periodically flush transaction batches. Enabling this is recommended to avoid tuple timeouts while waiting for a batch to fill up.</td>
+<td>Integer. default 0</td>
+</tr>
+</tbody></table>
+
+<h2 id="hivestate-(org.apache.storm.hive.trident.hivetrident)">HiveState (org.apache.storm.hive.trident.HiveTrident)</h2>
+
+<p>Hive Trident state also follows similar pattern to HiveBolt it takes in HiveOptions as an arg.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">   <span class="n">DelimitedRecordHiveMapper</span> <span class="n">mapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">DelimitedRecordHiveMapper</span><span class="o">()</span>
+            <span class="o">.</span><span class="na">withColumnFields</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="n">colNames</span><span class="o">))</span>
+            <span class="o">.</span><span class="na">withTimeAsPartitionField</span><span class="o">(</span><span class="s">&quot;YYYY/MM/DD&quot;</span><span class="o">);</span>
+
+   <span class="n">HiveOptions</span> <span class="n">hiveOptions</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HiveOptions</span><span class="o">(</span><span class="n">metaStoreURI</span><span class="o">,</span><span class="n">dbName</span><span class="o">,</span><span class="n">tblName</span><span class="o">,</span><span class="n">mapper</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withTxnsPerBatch</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withBatchSize</span><span class="o">(</span><span class="mi">1000</span><span class="o">)</span>
+                                <span class="o">.</span><span class="na">withIdleTimeout</span><span class="o">(</span><span class="mi">10</span><span class="o">)</span>
+
+   <span class="n">StateFactory</span> <span class="n">factory</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HiveStateFactory</span><span class="o">().</span><span class="na">withOptions</span><span class="o">(</span><span class="n">hiveOptions</span><span class="o">);</span>
+   <span class="n">TridentState</span> <span class="n">state</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">factory</span><span class="o">,</span> <span class="n">hiveFields</span><span class="o">,</span> <span class="k">new</span> <span class="nf">HiveUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">Fields</span><span class="o">());</span>
+</code></pre></div>
+
+
+	          </div>
+	       </div>
+	  </div>
+<footer>
+    <div class="container-fluid">
+        <div class="row">
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Meetups</h5>
+                    <ul class="latest-news">
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
+                        
+                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>About Storm</h5>
+                    <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p>
+               </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>First Look</h5>
+                    <ul class="footer-list">
+                        <li><a href="/documentation/Rationale.html">Rationale</a></li>
+                        <li><a href="/tutorial.html">Tutorial</a></li>
+                        <li><a href="/documentation/Setting-up-development-environment.html">Setting up development environment</a></li>
+                        <li><a href="/documentation/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li>
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Documentation</h5>
+                    <ul class="footer-list">
+                        <li><a href="/doc-index.html">Index</a></li>
+                        <li><a href="/documentation.html">Manual</a></li>
+                        <li><a href="https://storm.apache.org/javadoc/apidocs/index.html">Javadoc</a></li>
+                        <li><a href="/documentation/FAQ.html">FAQ</a></li>
+                    </ul>
+                </div>
+            </div>
+        </div>
+        <hr/>
+        <div class="row">   
+            <div class="col-md-12">
+                <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
+                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
+                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
+            </div>
+        </div>
+    </div>
+</footer>
+<!--Footer End-->
+<!-- Scroll to top -->
+<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 
+
+</body>
+
+</html>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/_site/documentation/storm-jdbc.html
----------------------------------------------------------------------
diff --git a/_site/documentation/storm-jdbc.html b/_site/documentation/storm-jdbc.html
new file mode 100644
index 0000000..8342cfb
--- /dev/null
+++ b/_site/documentation/storm-jdbc.html
@@ -0,0 +1,428 @@
+<!DOCTYPE html>
+<html>
+    <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <title>Storm JDBC Integration</title>
+
+    <!-- Bootstrap core CSS -->
+    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
+    <!-- Bootstrap theme -->
+    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
+
+    <!-- Custom styles for this template -->
+    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
+    <link href="/css/style.css" rel="stylesheet">
+    <link href="/assets/css/owl.theme.css" rel="stylesheet">
+    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
+    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
+    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
+    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
+    <script type="text/javascript" src="/assets/js/storm.js"></script>
+    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
+    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
+    
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+
+
+  <body>
+    <header>
+  <div class="container-fluid">
+      <div class="row">
+          <div class="col-md-10">
+              <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
+            </div>
+            <div class="col-md-2">
+              <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
+            </div>
+        </div>
+    </div>
+</header>
+<!--Header End-->
+<!--Navigation Begin-->
+<div class="navbar" role="banner">
+  <div class="container-fluid">
+      <div class="navbar-header">
+          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+            </button>
+        </div>
+        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
+          <ul class="nav navbar-nav">
+              <li><a href="/index.html" id="home">Home</a></li>
+                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
+                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
+                <li><a href="/documentation.html" id="documentation">Documentation</a></li>
+                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
+                <li class="dropdown">
+                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
+                    <ul class="dropdown-menu">
+                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
+                        <li><a href="/contribute/People.html">People</a></li>
+                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
+                    </ul>
+                </li>
+                <li><a href="/2015/06/15/storm0100-beta-released.html" id="news">News</a></li>
+            </ul>
+        </nav>
+    </div>
+</div>
+
+
+
+    <div class="container-fluid">
+    <h1 class="page-title">Storm JDBC Integration</h1>
+          <div class="row">
+           	<div class="col-md-12">
+	             <!-- Documentation -->
+
+<p class="post-meta"></p>
+
+<p>Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
+in a storm topology.</p>
+
+<p><strong>Note</strong>: Throughout the examples below, we make use of com.google.common.collect.Lists and com.google.common.collect.Maps.</p>
+
+<h2 id="inserting-into-a-database.">Inserting into a database.</h2>
+
+<p>The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.</p>
+
+<h3 id="connectionprovider">ConnectionProvider</h3>
+
+<p>An interface that should be implemented by different connection pooling mechanism <code>org.apache.storm.jdbc.common.ConnectionProvider</code></p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">ConnectionProvider</span> <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span>
+    <span class="cm">/**</span>
+<span class="cm">     * method must be idempotent.</span>
+<span class="cm">     */</span>
+    <span class="kt">void</span> <span class="nf">prepare</span><span class="o">();</span>
+
+    <span class="cm">/**</span>
+<span class="cm">     *</span>
+<span class="cm">     * @return a DB connection over which the queries can be executed.</span>
+<span class="cm">     */</span>
+    <span class="n">Connection</span> <span class="nf">getConnection</span><span class="o">();</span>
+
+    <span class="cm">/**</span>
+<span class="cm">     * called once when the system is shutting down, should be idempotent.</span>
+<span class="cm">     */</span>
+    <span class="kt">void</span> <span class="nf">cleanup</span><span class="o">();</span>
+<span class="o">}</span>
+</code></pre></div>
+<p>Out of the box we support <code>org.apache.storm.jdbc.common.HikariCPConnectionProvider</code> which is an implementation that uses HikariCP.</p>
+
+<h3 id="jdbcmapper">JdbcMapper</h3>
+
+<p>The main API for inserting data in a table using JDBC is the <code>org.apache.storm.jdbc.mapper.JdbcMapper</code> interface:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">JdbcMapper</span>  <span class="kd">extends</span> <span class="n">Serializable</span> <span class="o">{</span>
+    <span class="n">List</span><span class="o">&lt;</span><span class="n">Column</span><span class="o">&gt;</span> <span class="nf">getColumns</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">);</span>
+<span class="o">}</span>
+</code></pre></div>
+<p>The <code>getColumns()</code> method defines how a storm tuple maps to a list of columns representing a row in a database. 
+<strong>The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.</strong>
+For example if the user supplied insert query is <code>insert into user(user_id, user_name, create_date) values (?,?, now())</code> the 1st item 
+of the returned list of <code>getColumns</code> method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
+the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector
+to be used by some non-standard sql frameworks like Pheonix which only supports upsert into.</p>
+
+<h3 id="jdbcinsertbolt">JdbcInsertBolt</h3>
+
+<p>To use the <code>JdbcInsertBolt</code>, you construct an instance of it by specifying a <code>ConnectionProvider</code> implementation
+and a <code>JdbcMapper</code> implementation that converts storm tuple to DB row. In addition, you must either supply
+a table name  using <code>withTableName</code> method or an insert query using <code>withInsertQuery</code>. 
+If you specify a insert query you should ensure that your <code>JdbcMapper</code> implementation will return a list of columns in the same order as in your insert query.
+You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take. 
+The default is set to value of topology.message.timeout.secs and a value of -1 will indicate not to set any query timeout.
+You should set the query timeout value to be &lt;= topology.message.timeout.secs.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Map</span> <span class="n">hikariConfigMap</span> <span class="o">=</span> <span class="n">Maps</span><span class="o">.</span><span class="na">newHashMap</span><span class="o">();</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSourceClassName&quot;</span><span class="o">,</span><span class="s">&quot;com.mysql.jdbc.jdbc2.optional.MysqlDataSource&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.url&quot;</span><span class="o">,</span> <span class="s">&quot;jdbc:mysql://localhost/test&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.user&quot;</span><span class="o">,</span><span class="s">&quot;root&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.password&quot;</span><span class="o">,</span><span class="s">&quot;password&quot;</span><span class="o">);</span>
+<span class="n">ConnectionProvider</span> <span class="n">connectionProvider</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HikariCPConnectionProvider</span><span class="o">(</span><span class="n">hikariConfigMap</span><span class="o">);</span>
+
+<span class="n">String</span> <span class="n">tableName</span> <span class="o">=</span> <span class="s">&quot;user_details&quot;</span><span class="o">;</span>
+<span class="n">JdbcMapper</span> <span class="n">simpleJdbcMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SimpleJdbcMapper</span><span class="o">(</span><span class="n">tableName</span><span class="o">,</span> <span class="n">connectionProvider</span><span class="o">);</span>
+
+<span class="n">JdbcInsertBolt</span> <span class="n">userPersistanceBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JdbcInsertBolt</span><span class="o">(</span><span class="n">connectionProvider</span><span class="o">,</span> <span class="n">simpleJdbcMapper</span><span class="o">)</span>
+                                    <span class="o">.</span><span class="na">withTableName</span><span class="o">(</span><span class="s">&quot;user&quot;</span><span class="o">)</span>
+                                    <span class="o">.</span><span class="na">withQueryTimeoutSecs</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
+                                    <span class="n">Or</span>
+<span class="n">JdbcInsertBolt</span> <span class="n">userPersistanceBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JdbcInsertBolt</span><span class="o">(</span><span class="n">connectionProvider</span><span class="o">,</span> <span class="n">simpleJdbcMapper</span><span class="o">)</span>
+                                    <span class="o">.</span><span class="na">withInsertQuery</span><span class="o">(</span><span class="s">&quot;insert into user values (?,?)&quot;</span><span class="o">)</span>
+                                    <span class="o">.</span><span class="na">withQueryTimeoutSecs</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>                                    
+</code></pre></div>
+<h3 id="simplejdbcmapper">SimpleJdbcMapper</h3>
+
+<p><code>storm-jdbc</code> includes a general purpose <code>JdbcMapper</code> implementation called <code>SimpleJdbcMapper</code> that can map Storm
+tuple to a Database row. <code>SimpleJdbcMapper</code> assumes that the storm tuple has fields with same name as the column name in 
+the database table that you intend to write to.</p>
+
+<p>To use <code>SimpleJdbcMapper</code>, you simply tell it the tableName that you want to write to and provide a connectionProvider instance.</p>
+
+<p>The following code creates a <code>SimpleJdbcMapper</code> instance that:</p>
+
+<ol>
+<li>Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.</li>
+<li>Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
+automatically figure out the column names and corresponding data types of the table that you intend to write to. 
+Please see <a href="https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby">https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby</a> to learn more about hikari configuration properties.</li>
+</ol>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Map</span> <span class="n">hikariConfigMap</span> <span class="o">=</span> <span class="n">Maps</span><span class="o">.</span><span class="na">newHashMap</span><span class="o">();</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSourceClassName&quot;</span><span class="o">,</span><span class="s">&quot;com.mysql.jdbc.jdbc2.optional.MysqlDataSource&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.url&quot;</span><span class="o">,</span> <span class="s">&quot;jdbc:mysql://localhost/test&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.user&quot;</span><span class="o">,</span><span class="s">&quot;root&quot;</span><span class="o">);</span>
+<span class="n">hikariConfigMap</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&quot;dataSource.password&quot;</span><span class="o">,</span><span class="s">&quot;password&quot;</span><span class="o">);</span>
+<span class="n">ConnectionProvider</span> <span class="n">connectionProvider</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">HikariCPConnectionProvider</span><span class="o">(</span><span class="n">hikariConfigMap</span><span class="o">);</span>
+<span class="n">String</span> <span class="n">tableName</span> <span class="o">=</span> <span class="s">&quot;user_details&quot;</span><span class="o">;</span>
+<span class="n">JdbcMapper</span> <span class="n">simpleJdbcMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SimpleJdbcMapper</span><span class="o">(</span><span class="n">tableName</span><span class="o">,</span> <span class="n">connectionProvider</span><span class="o">);</span>
+</code></pre></div>
+<p>The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its <code>getColumn</code>
+method will return the columns in the order in which Jdbc connection instance&#39;s <code>connection.getMetaData().getColumns();</code> method returns them.</p>
+
+<p><strong>If you specified your own insert query to <code>JdbcInsertBolt</code> you must initialize <code>SimpleJdbcMapper</code> with explicit columnschema such that the schema has columns in the same order as your insert queries.</strong>
+For example if your insert query is <code>Insert into user (user_id, user_name) values (?,?)</code> then your <code>SimpleJdbcMapper</code> should be initialized with the following statements:
+<code>java
+List&lt;Column&gt; columnSchema = Lists.newArrayList(
+    new Column(&quot;user_id&quot;, java.sql.Types.INTEGER),
+    new Column(&quot;user_name&quot;, java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+</code></p>
+
+<p>If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
+<code>SimpleJdbcMapper</code> with explicit columnschema. For example, if you have a user_details table <code>create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);</code>
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
+you can initialize the <code>jdbcMapper</code> as below:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">List</span><span class="o">&lt;</span><span class="n">Column</span><span class="o">&gt;</span> <span class="n">columnSchema</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span>
+    <span class="k">new</span> <span class="nf">Column</span><span class="o">(</span><span class="s">&quot;user_id&quot;</span><span class="o">,</span> <span class="n">java</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">Types</span><span class="o">.</span><span class="na">INTEGER</span><span class="o">),</span>
+    <span class="k">new</span> <span class="nf">Column</span><span class="o">(</span><span class="s">&quot;user_name&quot;</span><span class="o">,</span> <span class="n">java</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">Types</span><span class="o">.</span><span class="na">VARCHAR</span><span class="o">),</span>
+    <span class="k">new</span> <span class="nf">Column</span><span class="o">(</span><span class="s">&quot;dept_name&quot;</span><span class="o">,</span> <span class="n">java</span><span class="o">.</span><span class="na">sql</span><span class="o">.</span><span class="na">Types</span><span class="o">.</span><span class="na">VARCHAR</span><span class="o">));</span>
+<span class="n">JdbcMapper</span> <span class="n">simpleJdbcMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SimpleJdbcMapper</span><span class="o">(</span><span class="n">columnSchema</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="jdbctridentstate">JdbcTridentState</h3>
+
+<p>We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance.
+See the example below:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JdbcState</span><span class="o">.</span><span class="na">Options</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JdbcState</span><span class="o">.</span><span class="na">Options</span><span class="o">()</span>
+        <span class="o">.</span><span class="na">withConnectionProvider</span><span class="o">(</span><span class="n">connectionProvider</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withMapper</span><span class="o">(</span><span class="n">jdbcMapper</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withTableName</span><span class="o">(</span><span class="s">&quot;user_details&quot;</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withQueryTimeoutSecs</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
+<span class="n">JdbcStateFactory</span> <span class="n">jdbcStateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JdbcStateFactory</span><span class="o">(</span><span class="n">options</span><span class="o">);</span>
+</code></pre></div>
+<p>similar to <code>JdbcInsertBolt</code> you can specify a custom insert query using <code>withInsertQuery</code> instead of specifying a table name.</p>
+
+<h2 id="lookup-from-database">Lookup from Database</h2>
+
+<p>We support <code>select</code> queries from databases to allow enrichment of storm tuples in a topology. The main API for 
+executing select queries against a database using JDBC is the <code>org.apache.storm.jdbc.mapper.JdbcLookupMapper</code> interface:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">    <span class="kt">void</span> <span class="nf">declareOutputFields</span><span class="o">(</span><span class="n">OutputFieldsDeclarer</span> <span class="n">declarer</span><span class="o">);</span>
+    <span class="n">List</span><span class="o">&lt;</span><span class="n">Column</span><span class="o">&gt;</span> <span class="nf">getColumns</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">tuple</span><span class="o">);</span>
+    <span class="n">List</span><span class="o">&lt;</span><span class="n">Values</span><span class="o">&gt;</span> <span class="nf">toTuple</span><span class="o">(</span><span class="n">ITuple</span> <span class="n">input</span><span class="o">,</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">Column</span><span class="o">&gt;</span> <span class="n">columns</span><span class="o">);</span>
+</code></pre></div>
+<p>The <code>declareOutputFields</code> method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
+tuple. </p>
+
+<p>The <code>getColumns</code> method specifies the place holder columns in a select query and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a query <code>select user_name from user_details where
+user_id = ? and create_time &gt; ?</code> the <code>getColumns</code> method would take a storm input tuple and return a List containing two items.
+The first instance of <code>Column</code> type&#39;s <code>getValue()</code> method will be used as the value of <code>user_id</code> to lookup for and the
+second instance of <code>Column</code> type&#39;s <code>getValue()</code> method will be used as the value of <code>create_time</code>.
+<strong>Note: the order in the returned list determines the place holder&#39;s value. In other words the first item in the list maps 
+to first <code>?</code> in select query, the second item to second <code>?</code> in query and so on.</strong> </p>
+
+<p>The <code>toTuple</code> method takes in the input tuple and a list of columns representing a DB row as a result of the select query
+and returns a list of values to be emitted. 
+<strong>Please note that it returns a list of <code>Values</code> and not just a single instance of <code>Values</code>.</strong> 
+This allows a for a single DB row to be mapped to multiple output storm tuples.</p>
+
+<h3 id="simplejdbclookupmapper">SimpleJdbcLookupMapper</h3>
+
+<p><code>storm-jdbc</code> includes a general purpose <code>JdbcLookupMapper</code> implementation called <code>SimpleJdbcLookupMapper</code>. </p>
+
+<p>To use <code>SimpleJdbcMapper</code>, you have to initialize it with the fields that will be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following example shows initialization of a <code>SimpleJdbcLookupMapper</code>
+that declares <code>user_id,user_name,create_date</code> as output fields and <code>user_id</code> as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
+<code>SimpleJdbcMapper</code> will look for a field <code>use_id</code> in the input tuple and use its value as the place holder&#39;s value in the
+select query. For constructing output tuples, it looks for fields specified in <code>outputFields</code> in the input tuple first, 
+and if it is not found in input tuple then it looks at select query&#39;s output row for a column with same name as field name. 
+So in the example below if the input tuple had fields <code>user_id, create_date</code> and the select query was 
+<code>select user_name from user_details where user_id = ?</code>, For each input tuple <code>SimpleJdbcLookupMapper.getColumns(tuple)</code> 
+will return the value of <code>tuple.getValueByField(&quot;user_id&quot;)</code> which will be used as the value in <code>?</code> of select query. 
+For each output row from DB, <code>SimpleJdbcLookupMapper.toTuple()</code> will use the <code>user_id, create_date</code> from the input tuple as 
+is adding only <code>user_name</code> from the resulting row and returning these 3 fields as a single output tuple.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Fields</span> <span class="n">outputFields</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">&quot;user_id&quot;</span><span class="o">,</span> <span class="s">&quot;user_name&quot;</span><span class="o">,</span> <span class="s">&quot;create_date&quot;</span><span class="o">);</span>
+<span class="n">List</span><span class="o">&lt;</span><span class="n">Column</span><span class="o">&gt;</span> <span class="n">queryParamColumns</span> <span class="o">=</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="k">new</span> <span class="nf">Column</span><span class="o">(</span><span class="s">&quot;user_id&quot;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">INTEGER</span><span class="o">));</span>
+<span class="k">this</span><span class="o">.</span><span class="na">jdbcLookupMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SimpleJdbcLookupMapper</span><span class="o">(</span><span class="n">outputFields</span><span class="o">,</span> <span class="n">queryParamColumns</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="jdbclookupbolt">JdbcLookupBolt</h3>
+
+<p>To use the <code>JdbcLookupBolt</code>, construct an instance of it using a <code>ConnectionProvider</code> instance, <code>JdbcLookupMapper</code> instance and the select query to execute.
+You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
+The default is set to value of topology.message.timeout.secs. You should set this value to be &lt;= topology.message.timeout.secs.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">String</span> <span class="n">selectSql</span> <span class="o">=</span> <span class="s">&quot;select user_name from user_details where user_id = ?&quot;</span><span class="o">;</span>
+<span class="n">SimpleJdbcLookupMapper</span> <span class="n">lookupMapper</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SimpleJdbcLookupMapper</span><span class="o">(</span><span class="n">outputFields</span><span class="o">,</span> <span class="n">queryParamColumns</span><span class="o">)</span>
+<span class="n">JdbcLookupBolt</span> <span class="n">userNameLookupBolt</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">JdbcLookupBolt</span><span class="o">(</span><span class="n">connectionProvider</span><span class="o">,</span> <span class="n">selectSql</span><span class="o">,</span> <span class="n">lookupMapper</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withQueryTimeoutSecs</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="jdbctridentstate-for-lookup">JdbcTridentState for lookup</h3>
+
+<p>We also support a trident query state that can be used with trident topologies. </p>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">JdbcState</span><span class="o">.</span><span class="na">Options</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">JdbcState</span><span class="o">.</span><span class="na">Options</span><span class="o">()</span>
+        <span class="o">.</span><span class="na">withConnectionProvider</span><span class="o">(</span><span class="n">connectionProvider</span><span class="o">)</span>
+        <span class="o">.</span><span class="na">withJdbcLookupMapper</span><span class="o">(</span><span class="k">new</span> <span class="nf">SimpleJdbcLookupMapper</span><span class="o">(</span><span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">&quot;user_name&quot;</span><span class="o">),</span> <span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="k">new</span> <span class="nf">Column</span><span class="o">(</span><span class="s">&quot;user_id&quot;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">INTEGER</span><span class="o">))))</span>
+        <span class="o">.</span><span class="na">withSelectQuery</span><span class="o">(</span><span class="s">&quot;select user_name from user_details where user_id = ?&quot;</span><span class="o">);</span>
+        <span class="o">.</span><span class="na">withQueryTimeoutSecs</span><span class="o">(</span><span class="mi">30</span><span class="o">);</span>
+</code></pre></div>
+<h2 id="example:">Example:</h2>
+
+<p>A runnable example can be found in the <code>src/test/java/topology</code> directory.</p>
+
+<h3 id="setup">Setup</h3>
+
+<ul>
+<li>Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.</li>
+<li>The test topologies executes the following queries so your intended DB must support these queries for test topologies
+to work. 
+<code>SQL
+create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, &#39;R&amp;D&#39;);
+insert into department values (2, &#39;Finance&#39;);
+insert into department values (3, &#39;HR&#39;);
+insert into department values (4, &#39;Sales&#39;);
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
+</code>
+### Execution
+Run the <code>org.apache.storm.jdbc.topology.UserPersistanceTopology</code> class using storm jar command. The class expects 5 args
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]</li>
+</ul>
+
+<p>To make it work with Mysql, you can add the following to the pom.xml</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">&lt;dependency&gt;
+    &lt;groupId&gt;mysql&lt;/groupId&gt;
+    &lt;artifactId&gt;mysql-connector-java&lt;/artifactId&gt;
+    &lt;version&gt;5.1.31&lt;/version&gt;
+&lt;/dependency&gt;
+</code></pre></div>
+<p>You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute 
+<code>mvn clean compile assembly:single</code></p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">&lt;plugin&gt;
+    &lt;artifactId&gt;maven-assembly-plugin&lt;/artifactId&gt;
+    &lt;configuration&gt;
+        &lt;archive&gt;
+            &lt;manifest&gt;
+                &lt;mainClass&gt;fully.qualified.MainClass&lt;/mainClass&gt;
+            &lt;/manifest&gt;
+        &lt;/archive&gt;
+        &lt;descriptorRefs&gt;
+            &lt;descriptorRef&gt;jar-with-dependencies&lt;/descriptorRef&gt;
+        &lt;/descriptorRefs&gt;
+    &lt;/configuration&gt;
+&lt;/plugin&gt;
+</code></pre></div>
+<p>Mysql Example:
+<code>
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
+</code></p>
+
+<p>You can execute a select query against the user table which should show newly inserted rows:</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">select * from user;
+</code></pre></div>
+<p>For trident you can view <code>org.apache.storm.jdbc.topology.UserPersistanceTridentTopology</code>.</p>
+
+
+
+	          </div>
+	       </div>
+	  </div>
+<footer>
+    <div class="container-fluid">
+        <div class="row">
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Meetups</h5>
+                    <ul class="latest-news">
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
+                        
+                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>About Storm</h5>
+                    <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p>
+               </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>First Look</h5>
+                    <ul class="footer-list">
+                        <li><a href="/documentation/Rationale.html">Rationale</a></li>
+                        <li><a href="/tutorial.html">Tutorial</a></li>
+                        <li><a href="/documentation/Setting-up-development-environment.html">Setting up development environment</a></li>
+                        <li><a href="/documentation/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li>
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Documentation</h5>
+                    <ul class="footer-list">
+                        <li><a href="/doc-index.html">Index</a></li>
+                        <li><a href="/documentation.html">Manual</a></li>
+                        <li><a href="https://storm.apache.org/javadoc/apidocs/index.html">Javadoc</a></li>
+                        <li><a href="/documentation/FAQ.html">FAQ</a></li>
+                    </ul>
+                </div>
+            </div>
+        </div>
+        <hr/>
+        <div class="row">   
+            <div class="col-md-12">
+                <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
+                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
+                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
+            </div>
+        </div>
+    </div>
+</footer>
+<!--Footer End-->
+<!-- Scroll to top -->
+<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 
+
+</body>
+
+</html>
+

http://git-wip-us.apache.org/repos/asf/storm/blob/af6d8ffc/_site/documentation/storm-kafka.html
----------------------------------------------------------------------
diff --git a/_site/documentation/storm-kafka.html b/_site/documentation/storm-kafka.html
new file mode 100644
index 0000000..cda7ed7
--- /dev/null
+++ b/_site/documentation/storm-kafka.html
@@ -0,0 +1,438 @@
+<!DOCTYPE html>
+<html>
+    <head>
+    <meta charset="utf-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1">
+
+    <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
+    <link rel="icon" href="/favicon.ico" type="image/x-icon">
+
+    <title>Storm Kafka Integration</title>
+
+    <!-- Bootstrap core CSS -->
+    <link href="/assets/css/bootstrap.min.css" rel="stylesheet">
+    <!-- Bootstrap theme -->
+    <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
+
+    <!-- Custom styles for this template -->
+    <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
+    <link href="/css/style.css" rel="stylesheet">
+    <link href="/assets/css/owl.theme.css" rel="stylesheet">
+    <link href="/assets/css/owl.carousel.css" rel="stylesheet">
+    <script type="text/javascript" src="/assets/js/jquery.min.js"></script>
+    <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
+    <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
+    <script type="text/javascript" src="/assets/js/storm.js"></script>
+    <!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
+    <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
+    
+    <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
+    <!--[if lt IE 9]>
+      <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
+      <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
+    <![endif]-->
+  </head>
+
+
+  <body>
+    <header>
+  <div class="container-fluid">
+      <div class="row">
+          <div class="col-md-10">
+              <a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
+            </div>
+            <div class="col-md-2">
+              <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
+            </div>
+        </div>
+    </div>
+</header>
+<!--Header End-->
+<!--Navigation Begin-->
+<div class="navbar" role="banner">
+  <div class="container-fluid">
+      <div class="navbar-header">
+          <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+                <span class="icon-bar"></span>
+            </button>
+        </div>
+        <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
+          <ul class="nav navbar-nav">
+              <li><a href="/index.html" id="home">Home</a></li>
+                <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
+                <li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
+                <li><a href="/documentation.html" id="documentation">Documentation</a></li>
+                <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
+                <li class="dropdown">
+                    <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
+                    <ul class="dropdown-menu">
+                        <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
+                        <li><a href="/contribute/People.html">People</a></li>
+                        <li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
+                    </ul>
+                </li>
+                <li><a href="/2015/06/15/storm0100-beta-released.html" id="news">News</a></li>
+            </ul>
+        </nav>
+    </div>
+</div>
+
+
+
+    <div class="container-fluid">
+    <h1 class="page-title">Storm Kafka Integration</h1>
+          <div class="row">
+           	<div class="col-md-12">
+	             <!-- Documentation -->
+
+<p class="post-meta"></p>
+
+<p>Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.</p>
+
+<h2 id="spouts">Spouts</h2>
+
+<p>We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
+tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.</p>
+
+<h3 id="brokerhosts">BrokerHosts</h3>
+
+<p>In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. 
+Currently, we support the following two implementations:</p>
+
+<h4 id="zkhosts">ZkHosts</h4>
+
+<p>ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses 
+Kafka&#39;s ZooKeeper entries to track brokerHost -&gt; partition mapping. You can instantiate an object by calling
+<code>java
+    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr)
+</code>
+Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
+partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.</p>
+
+<p>By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
+should set host.refreshFreqSecs to your chosen value.</p>
+
+<h4 id="statichosts">StaticHosts</h4>
+
+<p>This is an alternative implementation where broker -&gt; partition information is static. In order to construct an instance
+of this class, you need to first construct an instance of GlobalPartitionInformation.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">    <span class="n">Broker</span> <span class="n">brokerForPartition0</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">);</span><span class="c1">//localhost:9092</span>
+    <span class="n">Broker</span> <span class="n">brokerForPartition1</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">,</span> <span class="mi">9092</span><span class="o">);</span><span class="c1">//localhost:9092 but we specified the port explicitly</span>
+    <span class="n">Broker</span> <span class="n">brokerForPartition2</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">&quot;localhost:9092&quot;</span><span class="o">);</span><span class="c1">//localhost:9092 specified as one string.</span>
+    <span class="n">GlobalPartitionInformation</span> <span class="n">partitionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">GlobalPartitionInformation</span><span class="o">();</span>
+    <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">brokerForPartition0</span><span class="o">);</span><span class="c1">//mapping from partition 0 to brokerForPartition0</span>
+    <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">brokerForPartition1</span><span class="o">);</span><span class="c1">//mapping from partition 1 to brokerForPartition1</span>
+    <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="n">brokerForPartition2</span><span class="o">);</span><span class="c1">//mapping from partition 2 to brokerForPartition2</span>
+    <span class="n">StaticHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StaticHosts</span><span class="o">(</span><span class="n">partitionInfo</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="kafkaconfig">KafkaConfig</h3>
+
+<p>The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. 
+<code>java
+    public KafkaConfig(BrokerHosts hosts, String topic)
+    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+</code></p>
+
+<p>The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
+The optional ClientId is used as a part of the ZooKeeper path where the spout&#39;s current consumption offset is stored.</p>
+
+<p>There are 2 extensions of KafkaConfig currently in use.</p>
+
+<p>Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
+behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer&#39;s offset. The id should uniquely
+identify your spout.
+<code>java
+public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
+public SpoutConfig(BrokerHosts hosts, String topic, String id);
+</code>
+In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
+```java
+    // setting for how often to save the current Kafka offset to ZooKeeper
+    public long stateUpdateIntervalMs = 2000;</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">// Exponential back-off retry settings.  These are used when retrying messages after a bolt
+// calls OutputCollector.fail().
+// Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+// resubmitting the message while still retrying.
+public long retryInitialDelayMs = 0;
+public double retryDelayMultiplier = 1.0;
+public long retryDelayMaxMs = 60 * 1000;
+
+// if set to true, spout will set Kafka topic as the emitted Stream ID
+public boolean topicAsStreamId = false;
+</code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">Core KafkaSpout only accepts an instance of SpoutConfig.
+
+TridentKafkaConfig is another extension of KafkaConfig.
+TridentKafkaEmitter only accepts TridentKafkaConfig.
+
+The KafkaConfig class also has bunch of public variables that controls your application&#39;s behavior. Here are defaults:
+```java
+    public int fetchSizeBytes = 1024 * 1024;
+    public int socketTimeoutMs = 10000;
+    public int fetchMaxWait = 10000;
+    public int bufferSizeBytes = 1024 * 1024;
+    public MultiScheme scheme = new RawMultiScheme();
+    public boolean ignoreZkOffsets = false;
+    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public long maxOffsetBehind = Long.MAX_VALUE;
+    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+    public int metricsTimeBucketSizeInSecs = 60;
+</code></pre></div>
+<p>Most of them are self explanatory except MultiScheme.</p>
+
+<h3 id="multischeme">MultiScheme</h3>
+
+<p>MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It
+also controls the naming of your output field.</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">  <span class="kd">public</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="nf">deserialize</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">ser</span><span class="o">);</span>
+  <span class="kd">public</span> <span class="n">Fields</span> <span class="nf">getOutputFields</span><span class="o">();</span>
+</code></pre></div>
+<p>The default <code>RawMultiScheme</code> just takes the <code>byte[]</code> and returns a tuple with <code>byte[]</code> as is. The name of the
+outputField is &quot;bytes&quot;.  There are alternative implementation like <code>SchemeAsMultiScheme</code> and
+<code>KeyValueSchemeAsMultiScheme</code> which can convert the <code>byte[]</code> to <code>String</code>.</p>
+
+<h3 id="examples">Examples</h3>
+
+<h4 id="core-spout">Core Spout</h4>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">BrokerHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">zkConnString</span><span class="o">);</span>
+<span class="n">SpoutConfig</span> <span class="n">spoutConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">hosts</span><span class="o">,</span> <span class="n">topicName</span><span class="o">,</span> <span class="s">&quot;/&quot;</span> <span class="o">+</span> <span class="n">topicName</span><span class="o">,</span> <span class="n">UUID</span><span class="o">.</span><span class="na">randomUUID</span><span class="o">().</span><span class="na">toString</span><span class="o">());</span>
+<span class="n">spoutConfig</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="nf">StringScheme</span><span class="o">());</span>
+<span class="n">KafkaSpout</span> <span class="n">kafkaSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">KafkaSpout</span><span class="o">(</span><span class="n">spoutConfig</span><span class="o">);</span>
+</code></pre></div>
+<h4 id="trident-spout">Trident Spout</h4>
+<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">TridentTopology</span><span class="o">();</span>
+<span class="n">BrokerHosts</span> <span class="n">zk</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="s">&quot;localhost&quot;</span><span class="o">);</span>
+<span class="n">TridentKafkaConfig</span> <span class="n">spoutConf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">TridentKafkaConfig</span><span class="o">(</span><span class="n">zk</span><span class="o">,</span> <span class="s">&quot;test-topic&quot;</span><span class="o">);</span>
+<span class="n">spoutConf</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="nf">StringScheme</span><span class="o">());</span>
+<span class="n">OpaqueTridentKafkaSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">OpaqueTridentKafkaSpout</span><span class="o">(</span><span class="n">spoutConf</span><span class="o">);</span>
+</code></pre></div>
+<h3 id="how-kafkaspout-stores-offsets-of-a-kafka-topic-and-recovers-in-case-of-failures">How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures</h3>
+
+<p>As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by
+setting <code>KafkaConfig.startOffsetTime</code> as follows:</p>
+
+<ol>
+<li><code>kafka.api.OffsetRequest.EarliestTime()</code>:  read from the beginning of the topic (i.e. from the oldest messages onwards)</li>
+<li><code>kafka.api.OffsetRequest.LatestTime()</code>: read from the end of the topic (i.e. any new messsages that are being written to the topic)</li>
+<li>A Unix timestamp aka seconds since the epoch (e.g. via <code>System.currentTimeMillis()</code>):
+see <a href="https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?">How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?</a> in the Kafka FAQ</li>
+</ol>
+
+<p>As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
+under the ZooKeeper path <code>SpoutConfig.zkRoot+ &quot;/&quot; + SpoutConfig.id</code>.  In the case of failures it recovers from the last
+written offset in ZooKeeper.</p>
+
+<blockquote>
+<p><strong>Important:</strong>  When re-deploying a topology make sure that the settings for <code>SpoutConfig.zkRoot</code> and <code>SpoutConfig.id</code>
+were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
+offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.</p>
+</blockquote>
+
+<p>This means that when a topology has run once the setting <code>KafkaConfig.startOffsetTime</code> will not have an effect for
+subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
+ZooKeeper to determine from where it should begin (more precisely: resume) reading.
+If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
+set the parameter <code>KafkaConfig.ignoreZkOffsets</code> to <code>true</code>.  If <code>true</code>, the spout will always begin reading from the
+offset defined by <code>KafkaConfig.startOffsetTime</code> as described above.</p>
+
+<h2 id="using-storm-kafka-with-different-versions-of-scala">Using storm-kafka with different versions of Scala</h2>
+
+<p>Storm-kafka&#39;s Kafka dependency is defined as <code>provided</code> scope in maven, meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version.</p>
+
+<p>When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
+use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your <code>pom.xml</code>:</p>
+<div class="highlight"><pre><code class="language-xml" data-lang="xml">        <span class="nt">&lt;dependency&gt;</span>
+            <span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
+            <span class="nt">&lt;artifactId&gt;</span>kafka_2.10<span class="nt">&lt;/artifactId&gt;</span>
+            <span class="nt">&lt;version&gt;</span>0.8.1.1<span class="nt">&lt;/version&gt;</span>
+            <span class="nt">&lt;exclusions&gt;</span>
+                <span class="nt">&lt;exclusion&gt;</span>
+                    <span class="nt">&lt;groupId&gt;</span>org.apache.zookeeper<span class="nt">&lt;/groupId&gt;</span>
+                    <span class="nt">&lt;artifactId&gt;</span>zookeeper<span class="nt">&lt;/artifactId&gt;</span>
+                <span class="nt">&lt;/exclusion&gt;</span>
+                <span class="nt">&lt;exclusion&gt;</span>
+                    <span class="nt">&lt;groupId&gt;</span>log4j<span class="nt">&lt;/groupId&gt;</span>
+                    <span class="nt">&lt;artifactId&gt;</span>log4j<span class="nt">&lt;/artifactId&gt;</span>
+                <span class="nt">&lt;/exclusion&gt;</span>
+            <span class="nt">&lt;/exclusions&gt;</span>
+        <span class="nt">&lt;/dependency&gt;</span>
+</code></pre></div>
+<p>Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm&#39;s dependencies.</p>
+
+<h2 id="writing-to-kafka-as-part-of-your-topology">Writing to Kafka as part of your topology</h2>
+
+<p>You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you 
+are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
+storm.kafka.trident.TridentKafkaUpdater.</p>
+
+<p>You need to provide implementation of following 2 interfaces</p>
+
+<h3 id="tupletokafkamapper-and-tridenttupletokafkamapper">TupleToKafkaMapper and TridentTupleToKafkaMapper</h3>
+
+<p>These interfaces have 2 methods defined:</p>
+<div class="highlight"><pre><code class="language-java" data-lang="java">    <span class="n">K</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
+    <span class="n">V</span> <span class="nf">getMessageFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
+</code></pre></div>
+<p>As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
+as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java 
+implementation. In the KafkaBolt, the implementation always looks for a field with field name &quot;key&quot; and &quot;message&quot; if you 
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility 
+reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
+These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.</p>
+
+<h3 id="kafkatopicselector-and-trident-kafkatopicselector">KafkaTopicSelector and trident KafkaTopicSelector</h3>
+
+<p>This interface has only one method
+<code>java
+public interface KafkaTopicSelector {
+    String getTopics(Tuple/TridentTuple tuple);
+}
+</code>
+The implementation of this interface should return the topic to which the tuple&#39;s key/message mapping needs to be published 
+You can return a null and the message will be ignored. If you have one static topic name then you can use 
+DefaultTopicSelector.java and set the name of the topic in the constructor.</p>
+
+<h3 id="specifying-kafka-producer-properties">Specifying Kafka producer properties</h3>
+
+<p>You can provide all the produce properties , see <a href="http://kafka.apache.org/documentation.html#producerconfigs">http://kafka.apache.org/documentation.html#producerconfigs</a> 
+section &quot;Important configuration properties for the producer&quot;, in your Storm topology config by setting the properties
+map with key kafka.broker.properties.</p>
+
+<h3 id="putting-it-all-together">Putting it all together</h3>
+
+<p>For the bolt :
+```java
+        TopologyBuilder builder = new TopologyBuilder();</p>
+<div class="highlight"><pre><code class="language-text" data-lang="text">    Fields fields = new Fields(&quot;key&quot;, &quot;message&quot;);
+    FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values(&quot;storm&quot;, &quot;1&quot;),
+                new Values(&quot;trident&quot;, &quot;1&quot;),
+                new Values(&quot;needs&quot;, &quot;1&quot;),
+                new Values(&quot;javadoc&quot;, &quot;1&quot;)
+    );
+    spout.setCycle(true);
+    builder.setSpout(&quot;spout&quot;, spout, 5);
+    KafkaBolt bolt = new KafkaBolt()
+            .withTopicSelector(new DefaultTopicSelector(&quot;test&quot;))
+            .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+    builder.setBolt(&quot;forwardToKafka&quot;, bolt, 8).shuffleGrouping(&quot;spout&quot;);
+
+    Config conf = new Config();
+    //set producer properties.
+    Properties props = new Properties();
+    props.put(&quot;metadata.broker.list&quot;, &quot;localhost:9092&quot;);
+    props.put(&quot;request.required.acks&quot;, &quot;1&quot;);
+    props.put(&quot;serializer.class&quot;, &quot;kafka.serializer.StringEncoder&quot;);
+    conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+
+    StormSubmitter.submitTopology(&quot;kafkaboltTest&quot;, conf, builder.createTopology());
+</code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">For Trident:
+
+```java
+        Fields fields = new Fields(&quot;word&quot;, &quot;count&quot;);
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values(&quot;storm&quot;, &quot;1&quot;),
+                new Values(&quot;trident&quot;, &quot;1&quot;),
+                new Values(&quot;needs&quot;, &quot;1&quot;),
+                new Values(&quot;javadoc&quot;, &quot;1&quot;)
+        );
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream(&quot;spout1&quot;, spout);
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withKafkaTopicSelector(new DefaultTopicSelector(&quot;test&quot;))
+                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper(&quot;word&quot;, &quot;count&quot;));
+        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+        Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put(&quot;metadata.broker.list&quot;, &quot;localhost:9092&quot;);
+        props.put(&quot;request.required.acks&quot;, &quot;1&quot;);
+        props.put(&quot;serializer.class&quot;, &quot;kafka.serializer.StringEncoder&quot;);
+        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+        StormSubmitter.submitTopology(&quot;kafkaTridentTest&quot;, conf, topology.build());
+</code></pre></div>
+
+
+	          </div>
+	       </div>
+	  </div>
+<footer>
+    <div class="container-fluid">
+        <div class="row">
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Meetups</h5>
+                    <ul class="latest-news">
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
+                        
+                        <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
+                        
+                        <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>About Storm</h5>
+                    <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p>
+               </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>First Look</h5>
+                    <ul class="footer-list">
+                        <li><a href="/documentation/Rationale.html">Rationale</a></li>
+                        <li><a href="/tutorial.html">Tutorial</a></li>
+                        <li><a href="/documentation/Setting-up-development-environment.html">Setting up development environment</a></li>
+                        <li><a href="/documentation/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li>
+                    </ul>
+                </div>
+            </div>
+            <div class="col-md-3">
+                <div class="footer-widget">
+                    <h5>Documentation</h5>
+                    <ul class="footer-list">
+                        <li><a href="/doc-index.html">Index</a></li>
+                        <li><a href="/documentation.html">Manual</a></li>
+                        <li><a href="https://storm.apache.org/javadoc/apidocs/index.html">Javadoc</a></li>
+                        <li><a href="/documentation/FAQ.html">FAQ</a></li>
+                    </ul>
+                </div>
+            </div>
+        </div>
+        <hr/>
+        <div class="row">   
+            <div class="col-md-12">
+                <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. 
+                    <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. 
+                    <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
+            </div>
+        </div>
+    </div>
+</footer>
+<!--Footer End-->
+<!-- Scroll to top -->
+<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> 
+
+</body>
+
+</html>
+