You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/06/01 03:28:29 UTC

svn commit: r1746370 [26/26] - in /storm/site/releases/2.0.0-SNAPSHOT: ./ images/ javadocs/ javadocs/org/apache/storm/ javadocs/org/apache/storm/class-use/ javadocs/org/apache/storm/cluster/ javadocs/org/apache/storm/cluster/class-use/ javadocs/org/apa...

Modified: storm/site/releases/2.0.0-SNAPSHOT/storm-kafka.md
URL: http://svn.apache.org/viewvc/storm/site/releases/2.0.0-SNAPSHOT/storm-kafka.md?rev=1746370&r1=1746369&r2=1746370&view=diff
==============================================================================
--- storm/site/releases/2.0.0-SNAPSHOT/storm-kafka.md (original)
+++ storm/site/releases/2.0.0-SNAPSHOT/storm-kafka.md Wed Jun  1 03:28:26 2016
@@ -17,13 +17,14 @@ Currently, we support the following two
 ####ZkHosts
 ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses 
 Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
+
 ```java
-    public ZkHosts(String brokerZkStr, String brokerZkPath) 
-    public ZkHosts(String brokerZkStr)
+public ZkHosts(String brokerZkStr, String brokerZkPath)
+public ZkHosts(String brokerZkStr)
 ```
+
 Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
 partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.
-
 By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
 should set host.refreshFreqSecs to your chosen value.
 
@@ -32,68 +33,73 @@ This is an alternative implementation wh
 of this class, you need to first construct an instance of GlobalPartitionInformation.
 
 ```java
-    Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
-    Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
-    Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
-    GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
-    partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
-    partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
-    partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
-    StaticHosts hosts = new StaticHosts(partitionInfo);
+Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
+Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
+Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
+GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
+partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
+partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
+partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
+StaticHosts hosts = new StaticHosts(partitionInfo);
 ```
 
 ###KafkaConfig
-The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. 
+The second thing needed for constructing a KafkaSpout is an instance of KafkaConfig.
+
 ```java
-    public KafkaConfig(BrokerHosts hosts, String topic)
-    public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
+public KafkaConfig(BrokerHosts hosts, String topic)
+public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
 ```
 
 The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
 The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.
-
 There are 2 extensions of KafkaConfig currently in use.
-
-Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
+SpoutConfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
 behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely
 identify your spout.
+
 ```java
 public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
 public SpoutConfig(BrokerHosts hosts, String topic, String id);
 ```
+
+You need to use the correct SpoutConifg for different use cases:
+
+- Core KafkaSpout only accepts an instance of SpoutConfig.
+- TridentKafkaConfig is another extension of KafkaConfig.
+- TridentKafkaEmitter only accepts TridentKafkaConfig.
+
 In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:
+
 ```java
-    // setting for how often to save the current Kafka offset to ZooKeeper
-    public long stateUpdateIntervalMs = 2000;
+// setting for how often to save the current Kafka offset to ZooKeeper
+public long stateUpdateIntervalMs = 2000;
 
-    // Exponential back-off retry settings.  These are used when retrying messages after a bolt
-    // calls OutputCollector.fail().
-    // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
-    // resubmitting the message while still retrying.
-    public long retryInitialDelayMs = 0;
-    public double retryDelayMultiplier = 1.0;
-    public long retryDelayMaxMs = 60 * 1000;
+// Exponential back-off retry settings.  These are used when retrying messages after a bolt
+// calls OutputCollector.fail().
+// Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+// resubmitting the message while still retrying.
+public long retryInitialDelayMs = 0;
+public double retryDelayMultiplier = 1.0;
+public long retryDelayMaxMs = 60 * 1000;
 
-    // if set to true, spout will set Kafka topic as the emitted Stream ID
-    public boolean topicAsStreamId = false;
+// if set to true, spout will set Kafka topic as the emitted Stream ID
+public boolean topicAsStreamId = false;
 ```
-Core KafkaSpout only accepts an instance of SpoutConfig.
-
-TridentKafkaConfig is another extension of KafkaConfig.
-TridentKafkaEmitter only accepts TridentKafkaConfig.
 
 The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:
+
 ```java
-    public int fetchSizeBytes = 1024 * 1024;
-    public int socketTimeoutMs = 10000;
-    public int fetchMaxWait = 10000;
-    public int bufferSizeBytes = 1024 * 1024;
-    public MultiScheme scheme = new RawMultiScheme();
-    public boolean ignoreZkOffsets = false;
-    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-    public long maxOffsetBehind = Long.MAX_VALUE;
-    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
-    public int metricsTimeBucketSizeInSecs = 60;
+public int fetchSizeBytes = 1024 * 1024;
+public int socketTimeoutMs = 10000;
+public int fetchMaxWait = 10000;
+public int bufferSizeBytes = 1024 * 1024;
+public MultiScheme scheme = new RawMultiScheme();
+public boolean ignoreZkOffsets = false;
+public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+public long maxOffsetBehind = Long.MAX_VALUE;
+public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+public int metricsTimeBucketSizeInSecs = 60;
 ```
 
 Most of them are self explanatory except MultiScheme.
@@ -102,8 +108,8 @@ MultiScheme is an interface that dictate
 also controls the naming of your output field.
 
 ```java
-  public Iterable<List<Object>> deserialize(byte[] ser);
-  public Fields getOutputFields();
+public Iterable<List<Object>> deserialize(byte[] ser);
+public Fields getOutputFields();
 ```
 
 The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the
@@ -167,21 +173,21 @@ When building a project with storm-kafka
 use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your `pom.xml`:
 
 ```xml
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.1.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
+<dependency>
+    <groupId>org.apache.kafka</groupId>
+    <artifactId>kafka_2.10</artifactId>
+    <version>0.8.1.1</version>
+    <exclusions>
+        <exclusion>
+            <groupId>org.apache.zookeeper</groupId>
+            <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+        </exclusion>
+    </exclusions>
+</dependency>
 ```
 
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
@@ -197,8 +203,8 @@ You need to provide implementation of fo
 These interfaces have 2 methods defined:
 
 ```java
-    K getKeyFromTuple(Tuple/TridentTuple tuple);
-    V getMessageFromTuple(Tuple/TridentTuple tuple);
+K getKeyFromTuple(Tuple/TridentTuple tuple);
+V getMessageFromTuple(Tuple/TridentTuple tuple);
 ```
 
 As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
@@ -228,60 +234,63 @@ map with key kafka.broker.properties.
 ###Putting it all together
 
 For the bolt :
+
 ```java
-        TopologyBuilder builder = new TopologyBuilder();
+TopologyBuilder builder = new TopologyBuilder();
     
-        Fields fields = new Fields("key", "message");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                    new Values("storm", "1"),
-                    new Values("trident", "1"),
-                    new Values("needs", "1"),
-                    new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-        builder.setSpout("spout", spout, 5);
-        KafkaBolt bolt = new KafkaBolt()
-                .withTopicSelector(new DefaultTopicSelector("test"))
-                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
-        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
+Fields fields = new Fields("key", "message");
+FixedBatchSpout spout = new FixedBatchSpout(fields,
+                                            4,
+                                            new Values("storm", "1"),
+                                            new Values("trident", "1"),
+                                            new Values("needs", "1"),
+                                            new Values("javadoc", "1")
+);
+spout.setCycle(true);
+builder.setSpout("spout", spout, 5);
+KafkaBolt bolt = new KafkaBolt()
+                     .withTopicSelector(new DefaultTopicSelector("test"))
+                     .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
+builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
         
-        Config conf = new Config();
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+Config conf = new Config();
+//set producer properties.
+Properties props = new Properties();
+props.put("metadata.broker.list", "localhost:9092");
+props.put("request.required.acks", "1");
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
         
-        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
+StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
 For Trident:
 
 ```java
-        Fields fields = new Fields("word", "count");
-        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
-                new Values("storm", "1"),
-                new Values("trident", "1"),
-                new Values("needs", "1"),
-                new Values("javadoc", "1")
-        );
-        spout.setCycle(true);
-
-        TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("spout1", spout);
-
-        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
-        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
-
-        Config conf = new Config();
-        //set producer properties.
-        Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
-        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
+Fields fields = new Fields("word", "count");
+FixedBatchSpout spout = new FixedBatchSpout(fields,
+                                            4,
+                                            new Values("storm", "1"),
+                                            new Values("trident", "1"),
+                                            new Values("needs", "1"),
+                                            new Values("javadoc", "1")
+);
+spout.setCycle(true);
+
+TridentTopology topology = new TridentTopology();
+Stream stream = topology.newStream("spout1", spout);
+
+TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                                            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+                                            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
+
+Config conf = new Config();
+//set producer properties.
+Properties props = new Properties();
+props.put("metadata.broker.list", "localhost:9092");
+props.put("request.required.acks", "1");
+props.put("serializer.class", "kafka.serializer.StringEncoder");
+conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
+StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
 ```

Modified: storm/site/releases/2.0.0-SNAPSHOT/storm-metrics-profiling-internal-actions.md
URL: http://svn.apache.org/viewvc/storm/site/releases/2.0.0-SNAPSHOT/storm-metrics-profiling-internal-actions.md?rev=1746370&r1=1746369&r2=1746370&view=diff
==============================================================================
--- storm/site/releases/2.0.0-SNAPSHOT/storm-metrics-profiling-internal-actions.md (original)
+++ storm/site/releases/2.0.0-SNAPSHOT/storm-metrics-profiling-internal-actions.md Wed Jun  1 03:28:26 2016
@@ -1,3 +1,9 @@
+---
+title: Storm Metrics for Profiling Various Storm Internal Actions
+layout: documentation
+documentation: true
+---
+
 # Storm Metrics for Profiling Various Storm Internal Actions
 
 With the addition of these metrics, Storm users can collect, view, and analyze the performance of various internal actions.  The actions that are profiled include thrift rpc calls and http quests within the storm daemons. For instance, in the Storm Nimbus daemon, the following thrift calls defined in the Nimbus$Iface are profiled:
@@ -67,4 +73,4 @@ Since we shade all of the packages we us
    
 For more information about io.dropwizard.metrics and metrics-clojure packages please reference their original documentation:
 - https://dropwizard.github.io/metrics/3.1.0/
-- http://metrics-clojure.readthedocs.org/en/latest/
\ No newline at end of file
+- http://metrics-clojure.readthedocs.org/en/latest/