You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/11 23:15:31 UTC

svn commit: r1738648 [12/12] - in /storm/site: publish/ publish/releases/0.9.6/ publish/releases/0.9.6/images/ publish/releases/0.9.6/images/logocontest/ publish/releases/0.9.6/images/logocontest/abartos/ publish/releases/0.9.6/images/logocontest/cbous...

Added: storm/site/releases/0.9.6/storm-hbase.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hbase.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-hbase.md (added)
+++ storm/site/releases/0.9.6/storm-hbase.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,241 @@
+---
+title: Storm HBase Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [Apache HBase](https://hbase.apache.org)
+
+## Usage
+The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper`
+interface:
+
+```java
+public interface HBaseMapper extends Serializable {
+    byte[] rowKey(Tuple tuple);
+
+    ColumnList columns(Tuple tuple);
+}
+```
+
+The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the
+row key.
+
+The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you
+to add both standard HBase columns as well as HBase counter columns.
+
+To add a standard column, use one of the `addColumn()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
+```
+
+To add a counter column, use one of the `addCounter()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
+```
+
+When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be
+provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain
+{(“storm.keytab.file”, “$keytab”), ("storm.kerberos.principal", “$principal”)}. Example:
+
+```java
+Config config = new Config();
+...
+config.put("storm.keytab.file", "$keytab");
+config.put("storm.kerberos.principal", "$principle");
+StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());
+```
+
+##Working with Secure HBASE using delegation tokens.
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase. 
+The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have 
+multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
+it to all workers. Instead of doing that you could use the following approach:
+
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"] 
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+hbase.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed, 
+if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is 
+atleast 1 hour less then that.)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration 
+files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
+delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+
+http://hbase.apache.org/book/security.html#security.rest.gateway
+
+You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html.
+
+### SimpleHBaseMapper
+`storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm
+tuples to both regular HBase columns as well as counter columns.
+
+To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns.
+
+The following code create a `SimpleHBaseMapper` instance that:
+
+1. Uses the `word` tuple value as a row key.
+2. Adds a standard HBase column for the tuple field `word`.
+3. Adds an HBase counter column for the tuple field `count`.
+4. Writes values to the `cf` column family.
+
+```java
+SimpleHBaseMapper mapper = new SimpleHBaseMapper() 
+        .withRowKeyField("word")
+        .withColumnFields(new Fields("word"))
+        .withCounterFields(new Fields("count"))
+        .withColumnFamily("cf");
+```
+### HBaseBolt
+To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation:
+
+ ```java
+HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ ```
+
+The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase.
+
+###HBaseValueMapper
+This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`.
+
+```java
+public interface HBaseValueMapper extends Serializable {
+    public List<Values> toTuples(Result result) throws Exception;
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant. 
+Each of the value returned by this function will be emitted by the `HBaseLookupBolt`.
+
+The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
+
+There is an example implementation in `src/test/java` directory.
+
+###HBaseProjectionCriteria
+This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
+for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`.
+
+```java
+public class HBaseProjectionCriteria implements Serializable {
+    public HBaseProjectionCriteria addColumnFamily(String columnFamily);
+    public HBaseProjectionCriteria addColumn(ColumnMetaData column);
+```    
+`addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included
+ in the projection.
+ 
+`addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty 
+ will be part of your projection.
+The following code creates a projectionCriteria which specifies a projection criteria that:
+
+1. includes count column from column family cf.
+2. includes all columns from column family cf2.
+
+```java
+HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
+    .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
+    .addColumnFamily("cf2");
+```
+
+###HBaseLookupBolt
+To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper` 
+and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`. 
+
+The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to 
+figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the 
+values to be emitted by the bolt.
+
+You can look at an example topology LookupWordCount.java under `src/test/java`.
+## Example: Persistent Word Count
+A runnable example can be found in the `src/test/java` directory.
+
+### Setup
+The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
+classpath pointing to your HBase cluster.
+
+Use the `hbase shell` command to create the schema:
+
+```
+> create 'WordCount', 'cf'
+```
+
+### Execution
+Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit).
+
+After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class
+to view the counter values stored in HBase. You should see something like to following:
+
+```
+Word: 'apple', Count: 6867
+Word: 'orange', Count: 6645
+Word: 'pineapple', Count: 6954
+Word: 'banana', Count: 6787
+Word: 'watermelon', Count: 6806
+```
+
+For reference, the sample topology is listed below:
+
+```java
+public class PersistentWordCount {
+    private static final String WORD_SPOUT = "WORD_SPOUT";
+    private static final String COUNT_BOLT = "COUNT_BOLT";
+    private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+    public static void main(String[] args) throws Exception {
+        Config config = new Config();
+
+        WordSpout spout = new WordSpout();
+        WordCounter bolt = new WordCounter();
+
+        SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+                .withRowKeyField("word")
+                .withColumnFields(new Fields("word"))
+                .withCounterFields(new Fields("count"))
+                .withColumnFamily("cf");
+
+        HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+
+
+        // wordSpout ==> countBolt ==> HBaseBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(WORD_SPOUT, spout, 1);
+        builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+        builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+        if (args.length == 0) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(10000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            config.setNumWorkers(3);
+            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+        }
+    }
+}
+```
+

Added: storm/site/releases/0.9.6/storm-hdfs.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hdfs.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-hdfs.md (added)
+++ storm/site/releases/0.9.6/storm-hdfs.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,368 @@
+---
+title: Storm HDFS Integration
+layout: documentation
+documentation: true
+---
+
+Storm components for interacting with HDFS file systems
+
+
+## Usage
+The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
+1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
+reach 5 megabytes in size.
+
+```java
+// use "|" instead of "," for field delimiter
+RecordFormat format = new DelimitedRecordFormat()
+        .withFieldDelimiter("|");
+
+// sync the filesystem after every 1k tuples
+SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+        .withPath("/foo/");
+
+HdfsBolt bolt = new HdfsBolt()
+        .withFsUrl("hdfs://localhost:54310")
+        .withFileNameFormat(fileNameFormat)
+        .withRecordFormat(format)
+        .withRotationPolicy(rotationPolicy)
+        .withSyncPolicy(syncPolicy);
+```
+
+### Packaging a Topology
+When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
+[maven-assembly-plugin]().
+
+The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
+resolution.
+
+If you experience errors such as the following:
+
+```
+java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
+```
+
+it's an indication that your topology jar file isn't packaged properly.
+
+If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
+create your topology jar:
+
+```xml
+<plugin>
+    <groupId>org.apache.maven.plugins</groupId>
+    <artifactId>maven-shade-plugin</artifactId>
+    <version>1.4</version>
+    <configuration>
+        <createDependencyReducedPom>true</createDependencyReducedPom>
+    </configuration>
+    <executions>
+        <execution>
+            <phase>package</phase>
+            <goals>
+                <goal>shade</goal>
+            </goals>
+            <configuration>
+                <transformers>
+                    <transformer
+                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    <transformer
+                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                        <mainClass></mainClass>
+                    </transformer>
+                </transformers>
+            </configuration>
+        </execution>
+    </executions>
+</plugin>
+
+```
+
+### Specifying a Hadoop Version
+By default, storm-hdfs uses the following Hadoop dependencies:
+
+```xml
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-client</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+<dependency>
+    <groupId>org.apache.hadoop</groupId>
+    <artifactId>hadoop-hdfs</artifactId>
+    <version>2.2.0</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
+```
+
+If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
+and add the dependencies for your preferred version in your pom.
+
+Hadoop client version incompatibilites can manifest as errors like:
+
+```
+com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
+```
+
+## Customization
+
+### Record Formats
+Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
+interface:
+
+```java
+public interface RecordFormat extends Serializable {
+    byte[] format(Tuple tuple);
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
+tab-delimited files.
+
+
+### File Naming
+File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
+interface:
+
+```java
+public interface FileNameFormat extends Serializable {
+    void prepare(Map conf, TopologyContext topologyContext);
+    String getName(long rotation, long timeStamp);
+    String getPath();
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat`  will create file names with the following format:
+
+     {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
+
+For example:
+
+     MyBolt-5-7-1390579837830.txt
+
+By default, prefix is empty and extenstion is ".txt".
+
+
+
+### Sync Policies
+Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
+to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
+
+```java
+public interface SyncPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+```
+The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
+to perform a sync/flush, after which it will call the `reset()` method.
+
+The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
+been processed.
+
+### File Rotation Policies
+Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
+`org.apache.storm.hdfs.rotation.FileRotation` interface:
+
+```java
+public interface FileRotationPolicy extends Serializable {
+    boolean mark(Tuple tuple, long offset);
+    void reset();
+}
+``` 
+
+The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
+data files reach a specific file size:
+
+```java
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+### File Rotation Actions
+Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
+What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
+example, moving a file to a different location or renaming it.
+
+
+```java
+public interface RotationAction extends Serializable {
+    void execute(FileSystem fileSystem, Path filePath) throws IOException;
+}
+```
+
+Storm-HDFS includes a simple action that will move a file after rotation:
+
+```java
+public class MoveFileAction implements RotationAction {
+    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
+
+    private String destination;
+
+    public MoveFileAction withDestination(String destDir){
+        destination = destDir;
+        return this;
+    }
+
+    @Override
+    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+        Path destPath = new Path(destination, filePath.getName());
+        LOG.info("Moving file {} to {}", filePath, destPath);
+        boolean success = fileSystem.rename(filePath, destPath);
+        return;
+    }
+}
+```
+
+If you are using Trident and sequence files you can do something like this:
+
+```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
+```
+
+
+## Support for HDFS Sequence Files
+
+The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
+
+```java
+        // sync the filesystem after every 1k tuples
+        SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+        // rotate files when they reach 5MB
+        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                .withExtension(".seq")
+                .withPath("/data/");
+
+        // create sequence format instance.
+        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
+
+        SequenceFileBolt bolt = new SequenceFileBolt()
+                .withFsUrl("hdfs://localhost:54310")
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(format)
+                .withRotationPolicy(rotationPolicy)
+                .withSyncPolicy(syncPolicy)
+                .withCompressionType(SequenceFile.CompressionType.RECORD)
+                .withCompressionCodec("deflate");
+```
+
+The `SequenceFileBolt` requires that you provide a `org.apache.storm.hdfs.bolt.format.SequenceFormat` that maps tuples to
+key/value pairs:
+
+```java
+public interface SequenceFormat extends Serializable {
+    Class keyClass();
+    Class valueClass();
+
+    Writable key(Tuple tuple);
+    Writable value(Tuple tuple);
+}
+```
+
+## Trident API
+storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors
+that of the bolts.
+
+ ```java
+         Fields hdfsFields = new Fields("field1", "field2");
+
+         FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+                 .withPath("/trident")
+                 .withPrefix("trident")
+                 .withExtension(".txt");
+
+         RecordFormat recordFormat = new DelimitedRecordFormat()
+                 .withFields(hdfsFields);
+
+         FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+        HdfsState.Options options = new HdfsState.HdfsFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withRecordFormat(recordFormat)
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310");
+
+         StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+         TridentState state = stream
+                 .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+ ```
+
+ To use the sequence file `State` implementation, use the `HdfsState.SequenceFileOptions`:
+
+ ```java
+        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+                .withFileNameFormat(fileNameFormat)
+                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+                .withRotationPolicy(rotationPolicy)
+                .withFsUrl("hdfs://localhost:54310")
+                .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
+```
+
+##Working with Secure HDFS
+If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We 
+currently have 2 options to support this:
+
+### Using HDFS delegation tokens 
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
+hdfs.kerberos.principal: "superuser@EXAMPLE.com" 
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
+less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
+specified in hadoop's core-site.xml)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"] 
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration 
+files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath. 
+Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the
+delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using 
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal 
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions 
+listed on this link
+http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
+
+You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a 
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:
+
+hdfs.keytab.file: "/path/to/keytab/"
+hdfs.kerberos.principal: "user@EXAMPLE.com"
+
+On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with 
+Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need
+to remember this as you bring up new hosts in the cluster.

Added: storm/site/releases/0.9.6/storm-kafka.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-kafka.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-kafka.md (added)
+++ storm/site/releases/0.9.6/storm-kafka.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,287 @@
+---
+title: Storm Kafka Integration
+layout: documentation
+documentation: true
+---
+
+Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.
+
+##Spouts
+We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
+tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.
+ 
+###BrokerHosts
+In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. 
+Currently, we support the following two implementations:
+
+####ZkHosts
+ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses 
+Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
+```java
+    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr)
+```
+Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
+partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.
+
+By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
+should set host.refreshFreqSecs to your chosen value.
+
+####StaticHosts
+This is an alternative implementation where broker -> partition information is static. In order to construct an instance
+of this class, you need to first construct an instance of GlobalPartitionInformation.
+
+```java
+    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
+    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
+    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
+    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
+    partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
+    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
+    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
+    StaticHosts hosts = new StaticHosts(partitionInfo);
+```
+
+###KafkaConfig
+The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. 
+```java
+    public KafkaConfig(BrokerHosts hosts, String topic)
+    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+```
+
+The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
+The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.
+
+There are 2 extensions of KafkaConfig currently in use.
+
+Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
+behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
+identify your spout.
+```java
+public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
+public SpoutConfig(BrokerHosts hosts, String topic, String id);
+```
+In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
+```java
+    // setting for how often to save the current Kafka offset to ZooKeeper
+    public long stateUpdateIntervalMs = 2000;
+
+    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
+    // calls OutputCollector.fail().
+    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+    // resubmitting the message while still retrying.
+    public long retryInitialDelayMs = 0;
+    public double retryDelayMultiplier = 1.0;
+    public long retryDelayMaxMs = 60 * 1000;
+
+    // if set to true, spout will set Kafka topic as the emitted Stream ID
+    public boolean topicAsStreamId = false;
+```
+Core KafkaSpout only accepts an instance of SpoutConfig.
+
+TridentKafkaConfig is another extension of KafkaConfig.
+TridentKafkaEmitter only accepts TridentKafkaConfig.
+
+The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:
+```java
+    public int fetchSizeBytes = 1024 * 1024;
+    public int socketTimeoutMs = 10000;
+    public int fetchMaxWait = 10000;
+    public int bufferSizeBytes = 1024 * 1024;
+    public MultiScheme scheme = new RawMultiScheme();
+    public boolean ignoreZkOffsets = false;
+    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public long maxOffsetBehind = Long.MAX_VALUE;
+    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+    public int metricsTimeBucketSizeInSecs = 60;
+```
+
+Most of them are self explanatory except MultiScheme.
+###MultiScheme
+MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It
+also controls the naming of your output field.
+
+```java
+  public Iterable<List<Object>> deserialize(byte[] ser);
+  public Fields getOutputFields();
+```
+
+The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the
+outputField is "bytes".  There are alternative implementation like `SchemeAsMultiScheme` and
+`KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+
+
+### Examples
+
+#### Core Spout
+
+```java
+BrokerHosts hosts = new ZkHosts(zkConnString);
+SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
+KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+```
+
+#### Trident Spout
+```java
+TridentTopology topology = new TridentTopology();
+BrokerHosts zk = new ZkHosts("localhost");
+TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
+spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
+OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
+```
+
+
+### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
+
+As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by
+setting `KafkaConfig.startOffsetTime` as follows:
+
+1. `kafka.api.OffsetRequest.EarliestTime()`:  read from the beginning of the topic (i.e. from the oldest messages onwards)
+2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic)
+3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`):
+   see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ
+
+As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
+under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`.  In the case of failures it recovers from the last
+written offset in ZooKeeper.
+
+> **Important:**  When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id`
+> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
+> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
+
+This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for
+subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
+ZooKeeper to determine from where it should begin (more precisely: resume) reading.
+If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
+set the parameter `KafkaConfig.ignoreZkOffsets` to `true`.  If `true`, the spout will always begin reading from the
+offset defined by `KafkaConfig.startOffsetTime` as described above.
+
+
+## Using storm-kafka with different versions of Scala
+
+Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version.
+
+When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
+use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
+
+```xml
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.10</artifactId>
+            <version>0.8.1.1</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+```
+
+Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
+
+##Writing to Kafka as part of your topology
+You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you 
+are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
+storm.kafka.trident.TridentKafkaUpdater.
+
+You need to provide implementation of following 2 interfaces
+
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
+
+```java
+    K getKeyFromTuple(Tuple/TridentTuple tuple);
+    V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
+as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java 
+implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you 
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility 
+reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
+These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+    String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published 
+You can return a null and the message will be ignored. If you have one static topic name then you can use 
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+
+### Specifying Kafka producer properties
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs 
+section "Important configuration properties for the producer", in your Storm topology config by setting the properties
+map with key kafka.broker.properties.
+
+###Putting it all together
+
+For the bolt :
+```java
+        TopologyBuilder builder = new TopologyBuilder();
+    
+        Fields fields = new Fields("key", "message");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                    new Values("storm", "1"),
+                    new Values("trident", "1"),
+                    new Values("needs", "1"),
+                    new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+        builder.setSpout("spout", spout, 5);
+        KafkaBolt bolt = new KafkaBolt()
+                .withTopicSelector(new DefaultTopicSelector("test"))
+                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+        
+        Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        
+        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
+```
+
+For Trident:
+
+```java
+        Fields fields = new Fields("word", "count");
+        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+                new Values("storm", "1"),
+                new Values("trident", "1"),
+                new Values("needs", "1"),
+                new Values("javadoc", "1")
+        );
+        spout.setCycle(true);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("spout1", spout);
+
+        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+        Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
+```