You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2015/10/28 15:13:51 UTC

[12/14] storm git commit: STORM-697: Review feedback: Fixed missing or misplaced licenses. Added a more verbose explaination of MessageMetadataSchemeAsMultiScheme in the README

STORM-697: Review feedback: Fixed missing or misplaced licenses. Added a more verbose explaination of MessageMetadataSchemeAsMultiScheme in the README


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0c830c1
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0c830c1
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0c830c1

Branch: refs/heads/master
Commit: c0c830c1e41059a39a61b6563d77ecab5f333186
Parents: 25e7bc4
Author: matt.tieman <ma...@inin.com>
Authored: Wed Oct 28 09:10:53 2015 -0400
Committer: matt.tieman <ma...@inin.com>
Committed: Wed Oct 28 09:10:53 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/README.md                  | 44 ++++++++++++--------
 .../jvm/storm/kafka/MessageMetadataScheme.java  | 10 ++---
 .../MessageMetadataSchemeAsMultiScheme.java     | 17 ++++++++
 .../kafka/StringMessageAndMetadataScheme.java   | 17 ++++++++
 4 files changed, 65 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 1bf14b7..04fb96c 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -6,16 +6,16 @@ Provides core Storm and Trident spout implementations for consuming data from Ap
 ##Spouts
 We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
 tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.
- 
+
 ###BrokerHosts
-In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. 
+In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts.
 Currently, we support the following two implementations:
 
 ####ZkHosts
-ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses 
+ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses
 Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling
 ```java
-    public ZkHosts(String brokerZkStr, String brokerZkPath) 
+    public ZkHosts(String brokerZkStr, String brokerZkPath)
     public ZkHosts(String brokerZkStr)
 ```
 Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
@@ -40,7 +40,7 @@ of this class, you need to first construct an instance of GlobalPartitionInforma
 ```
 
 ###KafkaConfig
-The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. 
+The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.
 ```java
     public KafkaConfig(BrokerHosts hosts, String topic)
     public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
@@ -103,9 +103,17 @@ also controls the naming of your output field.
   public Fields getOutputFields();
 ```
 
-The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the
-outputField is "bytes".  There are alternative implementations like `SchemeAsMultiScheme`,
-`KeyValueSchemeAsMultiScheme`, and `MessageMetadataSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+The default `RawMultiScheme` just takes the `byte[]` and returns a tuple with `byte[]` as is. The name of the outputField is "bytes". There are alternative implementations like `SchemeAsMultiScheme` and `KeyValueSchemeAsMultiScheme` which can convert the `byte[]` to `String`.
+
+There is also an extension of `SchemeAsMultiScheme`, `MessageMetadataSchemeAsMultiScheme`,
+which has an additional deserialize method that accepts the message `byte[]` in addition to the `Partition` and `offset` associated with the message.
+
+```java
+public Iterable<List<Object>> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset)
+
+```
+
+This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.
 
 
 ### Examples
@@ -184,7 +192,7 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependen
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
 
 ##Writing to Kafka as part of your topology
-You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you 
+You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
 are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
 storm.kafka.trident.TridentKafkaUpdater.
 
@@ -199,9 +207,9 @@ These interfaces have 2 methods defined:
 ```
 
 As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
-as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java 
-implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you 
-use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility 
+as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
+implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you
+use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
 reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
 In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
 These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.
@@ -213,12 +221,12 @@ public interface KafkaTopicSelector {
     String getTopics(Tuple/TridentTuple tuple);
 }
 ```
-The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published 
-You can return a null and the message will be ignored. If you have one static topic name then you can use 
+The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published
+You can return a null and the message will be ignored. If you have one static topic name then you can use
 DefaultTopicSelector.java and set the name of the topic in the constructor.
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties , see http://kafka.apache.org/documentation.html#newproducerconfigs 
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#newproducerconfigs
 section "Important configuration properties for the producer", in your Storm topology config by setting the properties
 map with key kafka.broker.properties.
 
@@ -227,7 +235,7 @@ map with key kafka.broker.properties.
 For the bolt :
 ```java
         TopologyBuilder builder = new TopologyBuilder();
-    
+
         Fields fields = new Fields("key", "message");
         FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                     new Values("storm", "1"),
@@ -241,7 +249,7 @@ For the bolt :
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
         builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-        
+
         Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
@@ -250,7 +258,7 @@ For the bolt :
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        
+
         StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
index da7acbf..92a5598 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataScheme.java
@@ -1,8 +1,3 @@
-package storm.kafka;
-
-import java.util.List;
-import backtype.storm.spout.Scheme;
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -20,6 +15,11 @@ import backtype.storm.spout.Scheme;
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package storm.kafka;
+
+import java.util.List;
+import backtype.storm.spout.Scheme;
+
 public interface MessageMetadataScheme extends Scheme {
     public List<Object> deserializeMessageWithMetadata(byte[] message, Partition partition, long offset);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
index e89e391..0567809 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package storm.kafka;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/storm/blob/c0c830c1/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
index 2dc4c02..031d497 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringMessageAndMetadataScheme.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package storm.kafka;
 
 import java.util.List;