You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/04/11 23:15:31 UTC
svn commit: r1738648 [12/12] - in /storm/site: publish/
publish/releases/0.9.6/ publish/releases/0.9.6/images/
publish/releases/0.9.6/images/logocontest/
publish/releases/0.9.6/images/logocontest/abartos/
publish/releases/0.9.6/images/logocontest/cbous...
Added: storm/site/releases/0.9.6/storm-hbase.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hbase.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-hbase.md (added)
+++ storm/site/releases/0.9.6/storm-hbase.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,241 @@
+---
+title: Storm HBase Integration
+layout: documentation
+documentation: true
+---
+
+Storm/Trident integration for [Apache HBase](https://hbase.apache.org)
+
+## Usage
+The main API for interacting with HBase is the `org.apache.storm.hbase.bolt.mapper.HBaseMapper`
+interface:
+
+```java
+public interface HBaseMapper extends Serializable {
+ byte[] rowKey(Tuple tuple);
+
+ ColumnList columns(Tuple tuple);
+}
+```
+
+The `rowKey()` method is straightforward: given a Storm tuple, return a byte array representing the
+row key.
+
+The `columns()` method defines what will be written to an HBase row. The `ColumnList` class allows you
+to add both standard HBase columns as well as HBase counter columns.
+
+To add a standard column, use one of the `addColumn()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addColumn(this.columnFamily, field.getBytes(), toBytes(tuple.getValueByField(field)));
+```
+
+To add a counter column, use one of the `addCounter()` methods:
+
+```java
+ColumnList cols = new ColumnList();
+cols.addCounter(this.columnFamily, field.getBytes(), toLong(tuple.getValueByField(field)));
+```
+
+When the remote HBase is security enabled, a kerberos keytab and the corresponding principal name need to be
+provided for the storm-hbase connector. Specifically, the Config object passed into the topology should contain
+{(âstorm.keytab.fileâ, â$keytabâ), ("storm.kerberos.principal", â$principalâ)}. Example:
+
+```java
+Config config = new Config();
+...
+config.put("storm.keytab.file", "$keytab");
+config.put("storm.kerberos.principal", "$principle");
+StormSubmitter.submitTopology("$topologyName", config, builder.createTopology());
+```
+
+##Working with Secure HBASE using delegation tokens.
+If your topology is going to interact with secure HBase, your bolts/states needs to be authenticated by HBase.
+The approach described above requires that all potential worker hosts have "storm.keytab.file" on them. If you have
+multiple topologies on a cluster , each with different hbase user, you will have to create multiple keytabs and distribute
+it to all workers. Instead of doing that you could use the following approach:
+
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hbase.security.AutoHBase"]
+nimbus.credential.renewers.classes : ["org.apache.storm.hbase.security.AutoHBase"]
+hbase.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hbase super user that can impersonate other users.)
+hbase.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 518400 (6 days, hbase tokens by default expire every 7 days and can not be renewed,
+if you have custom settings for hbase.auth.token.max.lifetime in hbase-site.xml than you should ensure this value is
+atleast 1 hour less then that.)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hbase.security.AutoHBase"]
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hbase configuration
+files(core-site.xml,hdfs-site.xml and hbase-site.xml) and the storm-hbase jar with all the dependencies is present in nimbus's classpath.
+Nimbus will use the keytab and principal specified in the config to authenticate with HBase. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHBase, nimbus will push the
+delegation tokens to all the workers for your topology and the hbase bolt/state will authenticate with these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in storm.kerberos.principal
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions
+listed on this link
+
+http://hbase.apache.org/book/security.html#security.rest.gateway
+
+You can read about setting up secure HBase here:http://hbase.apache.org/book/security.html.
+
+### SimpleHBaseMapper
+`storm-hbase` includes a general purpose `HBaseMapper` implementation called `SimpleHBaseMapper` that can map Storm
+tuples to both regular HBase columns as well as counter columns.
+
+To use `SimpleHBaseMapper`, you simply tell it which fields to map to which types of columns.
+
+The following code create a `SimpleHBaseMapper` instance that:
+
+1. Uses the `word` tuple value as a row key.
+2. Adds a standard HBase column for the tuple field `word`.
+3. Adds an HBase counter column for the tuple field `count`.
+4. Writes values to the `cf` column family.
+
+```java
+SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+ .withRowKeyField("word")
+ .withColumnFields(new Fields("word"))
+ .withCounterFields(new Fields("count"))
+ .withColumnFamily("cf");
+```
+### HBaseBolt
+To use the `HBaseBolt`, construct it with the name of the table to write to, an a `HBaseMapper` implementation:
+
+ ```java
+HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+ ```
+
+The `HBaseBolt` will delegate to the `mapper` instance to figure out how to persist tuple data to HBase.
+
+###HBaseValueMapper
+This class allows you to transform the HBase lookup result into storm Values that will be emitted by the `HBaseLookupBolt`.
+
+```java
+public interface HBaseValueMapper extends Serializable {
+ public List<Values> toTuples(Result result) throws Exception;
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
+```
+
+The `toTuples` method takes in a HBase `Result` instance and expects a List of `Values` instant.
+Each of the value returned by this function will be emitted by the `HBaseLookupBolt`.
+
+The `declareOutputFields` should be used to declare the outputFields of the `HBaseLookupBolt`.
+
+There is an example implementation in `src/test/java` directory.
+
+###HBaseProjectionCriteria
+This class allows you to specify the projection criteria for your HBase Get function. This is optional parameter
+for the lookupBolt and if you do not specify this instance all the columns will be returned by `HBaseLookupBolt`.
+
+```java
+public class HBaseProjectionCriteria implements Serializable {
+ public HBaseProjectionCriteria addColumnFamily(String columnFamily);
+ public HBaseProjectionCriteria addColumn(ColumnMetaData column);
+```
+`addColumnFamily` takes in columnFamily. Setting this parameter means all columns for this family will be included
+ in the projection.
+
+`addColumn` takes in a columnMetaData instance. Setting this parameter means only this column from the column familty
+ will be part of your projection.
+The following code creates a projectionCriteria which specifies a projection criteria that:
+
+1. includes count column from column family cf.
+2. includes all columns from column family cf2.
+
+```java
+HBaseProjectionCriteria projectionCriteria = new HBaseProjectionCriteria()
+ .addColumn(new HBaseProjectionCriteria.ColumnMetaData("cf", "count"))
+ .addColumnFamily("cf2");
+```
+
+###HBaseLookupBolt
+To use the `HBaseLookupBolt`, Construct it with the name of the table to write to, an implementation of `HBaseMapper`
+and an implementation of `HBaseRowToStormValueMapper`. You can optionally specify a `HBaseProjectionCriteria`.
+
+The `HBaseLookupBolt` will use the mapper to get rowKey to lookup for. It will use the `HBaseProjectionCriteria` to
+figure out which columns to include in the result and it will leverage the `HBaseRowToStormValueMapper` to get the
+values to be emitted by the bolt.
+
+You can look at an example topology LookupWordCount.java under `src/test/java`.
+## Example: Persistent Word Count
+A runnable example can be found in the `src/test/java` directory.
+
+### Setup
+The following steps assume you are running HBase locally, or there is an `hbase-site.xml` on the
+classpath pointing to your HBase cluster.
+
+Use the `hbase shell` command to create the schema:
+
+```
+> create 'WordCount', 'cf'
+```
+
+### Execution
+Run the `org.apache.storm.hbase.topology.PersistenWordCount` class (it will run the topology for 10 seconds, then exit).
+
+After (or while) the word count topology is running, run the `org.apache.storm.hbase.topology.WordCountClient` class
+to view the counter values stored in HBase. You should see something like to following:
+
+```
+Word: 'apple', Count: 6867
+Word: 'orange', Count: 6645
+Word: 'pineapple', Count: 6954
+Word: 'banana', Count: 6787
+Word: 'watermelon', Count: 6806
+```
+
+For reference, the sample topology is listed below:
+
+```java
+public class PersistentWordCount {
+ private static final String WORD_SPOUT = "WORD_SPOUT";
+ private static final String COUNT_BOLT = "COUNT_BOLT";
+ private static final String HBASE_BOLT = "HBASE_BOLT";
+
+
+ public static void main(String[] args) throws Exception {
+ Config config = new Config();
+
+ WordSpout spout = new WordSpout();
+ WordCounter bolt = new WordCounter();
+
+ SimpleHBaseMapper mapper = new SimpleHBaseMapper()
+ .withRowKeyField("word")
+ .withColumnFields(new Fields("word"))
+ .withCounterFields(new Fields("count"))
+ .withColumnFamily("cf");
+
+ HBaseBolt hbase = new HBaseBolt("WordCount", mapper);
+
+
+ // wordSpout ==> countBolt ==> HBaseBolt
+ TopologyBuilder builder = new TopologyBuilder();
+
+ builder.setSpout(WORD_SPOUT, spout, 1);
+ builder.setBolt(COUNT_BOLT, bolt, 1).shuffleGrouping(WORD_SPOUT);
+ builder.setBolt(HBASE_BOLT, hbase, 1).fieldsGrouping(COUNT_BOLT, new Fields("word"));
+
+
+ if (args.length == 0) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, builder.createTopology());
+ Thread.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ config.setNumWorkers(3);
+ StormSubmitter.submitTopology(args[0], config, builder.createTopology());
+ }
+ }
+}
+```
+
Added: storm/site/releases/0.9.6/storm-hdfs.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-hdfs.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-hdfs.md (added)
+++ storm/site/releases/0.9.6/storm-hdfs.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,368 @@
+---
+title: Storm HDFS Integration
+layout: documentation
+documentation: true
+---
+
+Storm components for interacting with HDFS file systems
+
+
+## Usage
+The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every
+1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they
+reach 5 megabytes in size.
+
+```java
+// use "|" instead of "," for field delimiter
+RecordFormat format = new DelimitedRecordFormat()
+ .withFieldDelimiter("|");
+
+// sync the filesystem after every 1k tuples
+SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+// rotate files when they reach 5MB
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/foo/");
+
+HdfsBolt bolt = new HdfsBolt()
+ .withFsUrl("hdfs://localhost:54310")
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy);
+```
+
+### Packaging a Topology
+When packaging your topology, it's important that you use the [maven-shade-plugin]() as opposed to the
+[maven-assembly-plugin]().
+
+The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme
+resolution.
+
+If you experience errors such as the following:
+
+```
+java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs
+```
+
+it's an indication that your topology jar file isn't packaged properly.
+
+If you are using maven to create your topology jar, you should use the following `maven-shade-plugin` configuration to
+create your topology jar:
+
+```xml
+<plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass></mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+</plugin>
+
+```
+
+### Specifying a Hadoop Version
+By default, storm-hdfs uses the following Hadoop dependencies:
+
+```xml
+<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+</dependency>
+<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ <version>2.2.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+</dependency>
+```
+
+If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency
+and add the dependencies for your preferred version in your pom.
+
+Hadoop client version incompatibilites can manifest as errors like:
+
+```
+com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)
+```
+
+## Customization
+
+### Record Formats
+Record format can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.RecordFormat`
+interface:
+
+```java
+public interface RecordFormat extends Serializable {
+ byte[] format(Tuple tuple);
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DelimitedRecordFormat` is capable of producing formats such as CSV and
+tab-delimited files.
+
+
+### File Naming
+File naming can be controlled by providing an implementation of the `org.apache.storm.hdfs.format.FileNameFormat`
+interface:
+
+```java
+public interface FileNameFormat extends Serializable {
+ void prepare(Map conf, TopologyContext topologyContext);
+ String getName(long rotation, long timeStamp);
+ String getPath();
+}
+```
+
+The provided `org.apache.storm.hdfs.format.DefaultFileNameFormat` will create file names with the following format:
+
+ {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}
+
+For example:
+
+ MyBolt-5-7-1390579837830.txt
+
+By default, prefix is empty and extenstion is ".txt".
+
+
+
+### Sync Policies
+Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available
+to clients reading the data) by implementing the `org.apache.storm.hdfs.sync.SyncPolicy` interface:
+
+```java
+public interface SyncPolicy extends Serializable {
+ boolean mark(Tuple tuple, long offset);
+ void reset();
+}
+```
+The `HdfsBolt` will call the `mark()` method for every tuple it processes. Returning `true` will trigger the `HdfsBolt`
+to perform a sync/flush, after which it will call the `reset()` method.
+
+The `org.apache.storm.hdfs.sync.CountSyncPolicy` class simply triggers a sync after the specified number of tuples have
+been processed.
+
+### File Rotation Policies
+Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a
+`org.apache.storm.hdfs.rotation.FileRotation` interface:
+
+```java
+public interface FileRotationPolicy extends Serializable {
+ boolean mark(Tuple tuple, long offset);
+ void reset();
+}
+```
+
+The `org.apache.storm.hdfs.rotation.FileSizeRotationPolicy` implementation allows you to trigger file rotation when
+data files reach a specific file size:
+
+```java
+FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+```
+
+### File Rotation Actions
+Both the HDFS bolt and Trident State implementation allow you to register any number of `RotationAction`s.
+What `RotationAction`s do is provide a hook to allow you to perform some action right after a file is rotated. For
+example, moving a file to a different location or renaming it.
+
+
+```java
+public interface RotationAction extends Serializable {
+ void execute(FileSystem fileSystem, Path filePath) throws IOException;
+}
+```
+
+Storm-HDFS includes a simple action that will move a file after rotation:
+
+```java
+public class MoveFileAction implements RotationAction {
+ private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);
+
+ private String destination;
+
+ public MoveFileAction withDestination(String destDir){
+ destination = destDir;
+ return this;
+ }
+
+ @Override
+ public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+ Path destPath = new Path(destination, filePath.getName());
+ LOG.info("Moving file {} to {}", filePath, destPath);
+ boolean success = fileSystem.rename(filePath, destPath);
+ return;
+ }
+}
+```
+
+If you are using Trident and sequence files you can do something like this:
+
+```java
+ HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("hdfs://localhost:54310")
+ .addRotationAction(new MoveFileAction().withDestination("/dest2/"));
+```
+
+
+## Support for HDFS Sequence Files
+
+The `org.apache.storm.hdfs.bolt.SequenceFileBolt` class allows you to write storm data to HDFS sequence files:
+
+```java
+ // sync the filesystem after every 1k tuples
+ SyncPolicy syncPolicy = new CountSyncPolicy(1000);
+
+ // rotate files when they reach 5MB
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withExtension(".seq")
+ .withPath("/data/");
+
+ // create sequence format instance.
+ DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");
+
+ SequenceFileBolt bolt = new SequenceFileBolt()
+ .withFsUrl("hdfs://localhost:54310")
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(format)
+ .withRotationPolicy(rotationPolicy)
+ .withSyncPolicy(syncPolicy)
+ .withCompressionType(SequenceFile.CompressionType.RECORD)
+ .withCompressionCodec("deflate");
+```
+
+The `SequenceFileBolt` requires that you provide a `org.apache.storm.hdfs.bolt.format.SequenceFormat` that maps tuples to
+key/value pairs:
+
+```java
+public interface SequenceFormat extends Serializable {
+ Class keyClass();
+ Class valueClass();
+
+ Writable key(Tuple tuple);
+ Writable value(Tuple tuple);
+}
+```
+
+## Trident API
+storm-hdfs also includes a Trident `state` implementation for writing data to HDFS, with an API that closely mirrors
+that of the bolts.
+
+ ```java
+ Fields hdfsFields = new Fields("field1", "field2");
+
+ FileNameFormat fileNameFormat = new DefaultFileNameFormat()
+ .withPath("/trident")
+ .withPrefix("trident")
+ .withExtension(".txt");
+
+ RecordFormat recordFormat = new DelimitedRecordFormat()
+ .withFields(hdfsFields);
+
+ FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);
+
+ HdfsState.Options options = new HdfsState.HdfsFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withRecordFormat(recordFormat)
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("hdfs://localhost:54310");
+
+ StateFactory factory = new HdfsStateFactory().withOptions(options);
+
+ TridentState state = stream
+ .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());
+ ```
+
+ To use the sequence file `State` implementation, use the `HdfsState.SequenceFileOptions`:
+
+ ```java
+ HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
+ .withFileNameFormat(fileNameFormat)
+ .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
+ .withRotationPolicy(rotationPolicy)
+ .withFsUrl("hdfs://localhost:54310")
+ .addRotationAction(new MoveFileAction().toDestination("/dest2/"));
+```
+
+##Working with Secure HDFS
+If your topology is going to interact with secure HDFS, your bolts/states needs to be authenticated by NameNode. We
+currently have 2 options to support this:
+
+### Using HDFS delegation tokens
+Your administrator can configure nimbus to automatically get delegation tokens on behalf of the topology submitter user.
+The nimbus need to start with following configurations:
+
+nimbus.autocredential.plugins.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]
+nimbus.credential.renewers.classes : ["org.apache.storm.hdfs.common.security.AutoHDFS"]
+hdfs.keytab.file: "/path/to/keytab/on/nimbus" (This is the keytab of hdfs super user that can impersonate other users.)
+hdfs.kerberos.principal: "superuser@EXAMPLE.com"
+nimbus.credential.renewers.freq.secs : 82800 (23 hours, hdfs tokens needs to be renewed every 24 hours so this value should be
+less then 24 hours.)
+topology.hdfs.uri:"hdfs://host:port" (This is an optional config, by default we will use value of "fs.defaultFS" property
+specified in hadoop's core-site.xml)
+
+Your topology configuration should have:
+topology.auto-credentials :["org.apache.storm.hdfs.common.security.AutoHDFS"]
+
+If nimbus did not have the above configuration you need to add it and then restart it. Ensure the hadoop configuration
+files(core-site.xml and hdfs-site.xml) and the storm-hdfs jar with all the dependencies is present in nimbus's classpath.
+Nimbus will use the keytab and principal specified in the config to authenticate with Namenode. From then on for every
+topology submission, nimbus will impersonate the topology submitter user and acquire delegation tokens on behalf of the
+topology submitter user. If topology was started with topology.auto-credentials set to AutoHDFS, nimbus will push the
+delegation tokens to all the workers for your topology and the hdfs bolt/state will authenticate with namenode using
+these tokens.
+
+As nimbus is impersonating topology submitter user, you need to ensure the user specified in hdfs.kerberos.principal
+has permissions to acquire tokens on behalf of other users. To achieve this you need to follow configuration directions
+listed on this link
+http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/Superusers.html
+
+You can read about setting up secure HDFS here: http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/SecureMode.html.
+
+### Using keytabs on all worker hosts
+If you have distributed the keytab files for hdfs user on all potential worker hosts then you can use this method. You should specify a
+hdfs config key using the method HdfsBolt/State.withconfigKey("somekey") and the value map of this key should have following 2 properties:
+
+hdfs.keytab.file: "/path/to/keytab/"
+hdfs.kerberos.principal: "user@EXAMPLE.com"
+
+On worker hosts the bolt/trident-state code will use the keytab file with principal provided in the config to authenticate with
+Namenode. This method is little dangerous as you need to ensure all workers have the keytab file at the same location and you need
+to remember this as you bring up new hosts in the cluster.
Added: storm/site/releases/0.9.6/storm-kafka.md
URL: http://svn.apache.org/viewvc/storm/site/releases/0.9.6/storm-kafka.md?rev=1738648&view=auto
==============================================================================
--- storm/site/releases/0.9.6/storm-kafka.md (added)
+++ storm/site/releases/0.9.6/storm-kafka.md Mon Apr 11 21:15:29 2016
@@ -0,0 +1,287 @@
+---
+title: Storm Kafka Integration
+layout: documentation
+documentation: true
+---
+
+Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.
+
+##Spouts
+We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
+tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.
+
+###BrokerHosts
+In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts.
+Currently, we support the following two implementations:
+
+####ZkHosts
+ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses
+Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
+```java
+ public ZkHosts(String brokerZkStr, String brokerZkPath)
+ public ZkHosts(String brokerZkStr)
+```
+Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
+partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.
+
+By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
+should set host.refreshFreqSecs to your chosen value.
+
+####StaticHosts
+This is an alternative implementation where broker -> partition information is static. In order to construct an instance
+of this class, you need to first construct an instance of GlobalPartitionInformation.
+
+```java
+ Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
+ Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
+ Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
+ GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
+ partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
+ partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
+ partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
+ StaticHosts hosts = new StaticHosts(partitionInfo);
+```
+
+###KafkaConfig
+The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.
+```java
+ public KafkaConfig(BrokerHosts hosts, String topic)
+ public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+```
+
+The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
+The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.
+
+There are 2 extensions of KafkaConfig currently in use.
+
+Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
+behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
+identify your spout.
+```java
+public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
+public SpoutConfig(BrokerHosts hosts, String topic, String id);
+```
+In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
+```java
+ // setting for how often to save the current Kafka offset to ZooKeeper
+ public long stateUpdateIntervalMs = 2000;
+
+ // Exponential back-off retry settings. These are used when retrying messages after a bolt
+ // calls OutputCollector.fail().
+ // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+ // resubmitting the message while still retrying.
+ public long retryInitialDelayMs = 0;
+ public double retryDelayMultiplier = 1.0;
+ public long retryDelayMaxMs = 60 * 1000;
+
+ // if set to true, spout will set Kafka topic as the emitted Stream ID
+ public boolean topicAsStreamId = false;
+```
+Core KafkaSpout only accepts an instance of SpoutConfig.
+
+TridentKafkaConfig is another extension of KafkaConfig.
+TridentKafkaEmitter only accepts TridentKafkaConfig.
+
+The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:
+```java
+ public int fetchSizeBytes = 1024 * 1024;
+ public int socketTimeoutMs = 10000;
+ public int fetchMaxWait = 10000;
+ public int bufferSizeBytes = 1024 * 1024;
+ public MultiScheme scheme = new RawMultiScheme();
+ public boolean ignoreZkOffsets = false;
+ public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ public long maxOffsetBehind = Long.MAX_VALUE;
+ public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+ public int metricsTimeBucketSizeInSecs = 60;
+```
+
+Most of them are self explanatory except MultiScheme.
+###MultiScheme
+MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It
+also controls the naming of your output field.
+
+```java
+ public Iterable<List<Object>> deserialize(byte[] ser);
+ public Fields getOutputFields();
+```
+
+The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the
+outputField is "bytes". There are alternative implementation like `SchemeAsMultiScheme` and
+`KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+
+
+### Examples
+
+#### Core Spout
+
+```java
+BrokerHosts hosts = new ZkHosts(zkConnString);
+SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
+spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
+KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+```
+
+#### Trident Spout
+```java
+TridentTopology topology = new TridentTopology();
+BrokerHosts zk = new ZkHosts("localhost");
+TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
+spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
+OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
+```
+
+
+### How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures
+
+As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by
+setting `KafkaConfig.startOffsetTime` as follows:
+
+1. `kafka.api.OffsetRequest.EarliestTime()`: read from the beginning of the topic (i.e. from the oldest messages onwards)
+2. `kafka.api.OffsetRequest.LatestTime()`: read from the end of the topic (i.e. any new messsages that are being written to the topic)
+3. A Unix timestamp aka seconds since the epoch (e.g. via `System.currentTimeMillis()`):
+ see [How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?](https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?) in the Kafka FAQ
+
+As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
+under the ZooKeeper path `SpoutConfig.zkRoot+ "/" + SpoutConfig.id`. In the case of failures it recovers from the last
+written offset in ZooKeeper.
+
+> **Important:** When re-deploying a topology make sure that the settings for `SpoutConfig.zkRoot` and `SpoutConfig.id`
+> were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
+> offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.
+
+This means that when a topology has run once the setting `KafkaConfig.startOffsetTime` will not have an effect for
+subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
+ZooKeeper to determine from where it should begin (more precisely: resume) reading.
+If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
+set the parameter `KafkaConfig.ignoreZkOffsets` to `true`. If `true`, the spout will always begin reading from the
+offset defined by `KafkaConfig.startOffsetTime` as described above.
+
+
+## Using storm-kafka with different versions of Scala
+
+Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
+as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version.
+
+When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
+use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
+
+```xml
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka_2.10</artifactId>
+ <version>0.8.1.1</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+```
+
+Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
+
+##Writing to Kafka as part of your topology
+You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
+are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
+storm.kafka.trident.TridentKafkaUpdater.
+
+You need to provide implementation of following 2 interfaces
+
+###TupleToKafkaMapper and TridentTupleToKafkaMapper
+These interfaces have 2 methods defined:
+
+```java
+ K getKeyFromTuple(Tuple/TridentTuple tuple);
+ V getMessageFromTuple(Tuple/TridentTuple tuple);
+```
+
+As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
+as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
+reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
+In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
+These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
+
+###KafkaTopicSelector and trident KafkaTopicSelector
+This interface has only one method
+```java
+public interface KafkaTopicSelector {
+ String getTopics(Tuple/TridentTuple tuple);
+}
+```
+The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one static topic name then you can use
+DefaultTopicSelector.java and set the name of the topic in the constructor.
+
+### Specifying Kafka producer properties
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs
+section "Important configuration properties for the producer", in your Storm topology config by setting the properties
+map with key kafka.broker.properties.
+
+###Putting it all together
+
+For the bolt :
+```java
+ TopologyBuilder builder = new TopologyBuilder();
+
+ Fields fields = new Fields("key", "message");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", "1"),
+ new Values("trident", "1"),
+ new Values("needs", "1"),
+ new Values("javadoc", "1")
+ );
+ spout.setCycle(true);
+ builder.setSpout("spout", spout, 5);
+ KafkaBolt bolt = new KafkaBolt()
+ .withTopicSelector(new DefaultTopicSelector("test"))
+ .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+ builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+
+ Config conf = new Config();
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("metadata.broker.list", "localhost:9092");
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+
+ StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
+```
+
+For Trident:
+
+```java
+ Fields fields = new Fields("word", "count");
+ FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
+ new Values("storm", "1"),
+ new Values("trident", "1"),
+ new Values("needs", "1"),
+ new Values("javadoc", "1")
+ );
+ spout.setCycle(true);
+
+ TridentTopology topology = new TridentTopology();
+ Stream stream = topology.newStream("spout1", spout);
+
+ TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+ .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+ .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+ stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+ Config conf = new Config();
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("metadata.broker.list", "localhost:9092");
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+ StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
+```