You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/09/02 00:32:35 UTC

[1/3] storm git commit: STORM-1459: allow not specifying producer properties in read-only Kafka table in StormSQL

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 0eabff7e6 -> c5e7b927a


STORM-1459: allow not specifying producer properties in read-only Kafka table in StormSQL


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/386fbba2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/386fbba2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/386fbba2

Branch: refs/heads/1.x-branch
Commit: 386fbba233d456a608234cec25a1137609021bd0
Parents: 0eabff7
Author: manuzhang <ow...@gmail.com>
Authored: Thu Sep 1 06:54:11 2016 +0800
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 2 09:31:48 2016 +0900

----------------------------------------------------------------------
 docs/storm-sql.md                               | 13 +++----
 .../sql/kafka/KafkaDataSourcesProvider.java     | 36 +++++++++++---------
 2 files changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/386fbba2/docs/storm-sql.md
----------------------------------------------------------------------
diff --git a/docs/storm-sql.md b/docs/storm-sql.md
index 17f0711..104b852 100644
--- a/docs/storm-sql.md
+++ b/docs/storm-sql.md
@@ -44,7 +44,7 @@ CREATE EXTERNAL TABLE table_name field_list
     [ AS select_stmt ]
 ```
 
-You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). For example, the following statement specifies a Kafka spouts and sink:
+You can find detailed explanations of the properties in [Hive Data Definition Language](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL). For example, the following statement specifies a Kafka spout and sink:
 
 ```
 CREATE EXTERNAL TABLE FOO (ID INT PRIMARY KEY) LOCATION 'kafka://localhost:2181/brokers?topic=test' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
@@ -104,21 +104,22 @@ Let's say there is a Kafka stream that represents the transactions of orders. Ea
 The user can specify the following SQL statements in the SQL file:
 
 ```
-CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
+CREATE EXTERNAL TABLE ORDERS (ID INT PRIMARY KEY, UNIT_PRICE INT, QUANTITY INT) LOCATION 'kafka://localhost:2181/brokers?topic=orders'
 CREATE EXTERNAL TABLE LARGE_ORDERS (ID INT PRIMARY KEY, TOTAL INT) LOCATION 'kafka://localhost:2181/brokers?topic=large_orders' TBLPROPERTIES '{"producer":{"bootstrap.servers":"localhost:9092","acks":"1","key.serializer":"org.apache.org.apache.storm.kafka.IntSerializer","value.serializer":"org.apache.org.apache.storm.kafka.ByteBufferSerializer"}}'
 INSERT INTO LARGE_ORDERS SELECT ID, UNIT_PRICE * QUANTITY AS TOTAL FROM ORDERS WHERE UNIT_PRICE * QUANTITY > 50
 ```
 
-The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`). The `TBLPROPERTIES` clause specifies the configuration of [KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs).
-Current implementation of `storm-sql-kafka` requires specifying both `LOCATION` and `TBLPROPERTIES` clauses even though the table is read-only or write-only.
+The first statement defines the table `ORDER` which represents the input stream. The `LOCATION` clause specifies the ZkHost (`localhost:2181`), the path of the brokers in ZooKeeper (`/brokers`) and the topic (`orders`). 
 
-Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The third statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by `LARGE_ORDER`.
+Similarly, the second statement specifies the table `LARGE_ORDERS` which represents the output stream. The `TBLPROPERTIES` clause specifies the configuration of [KafkaProducer](http://kafka.apache.org/documentation.html#producerconfigs) and is required for a Kafka sink table. 
+
+The third statement is a `SELECT` statement which defines the topology: it instructs StormSQL to filter all orders in the external table `ORDERS`, calculates the total price and inserts matching records into the Kafka stream specified by `LARGE_ORDER`.
 
 To run this example, users need to include the data sources (`storm-sql-kafka` in this case) and its dependency in the
 class path. Dependencies for Storm SQL are automatically handled when users run `storm sql`. Users can include data sources at the submission step like below:
 
 ```
-$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2\!org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
+$ bin/storm sql order_filtering.sql order_filtering --artifacts "org.apache.storm:storm-sql-kafka:2.0.0-SNAPSHOT,org.apache.storm:storm-kafka:2.0.0-SNAPSHOT,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12,org.apache.kafka:kafka-clients:0.8.2.2"
 ```
 
 Above command submits the SQL statements to StormSQL. Users need to modify each artifacts' version if users are using different version of Storm or Kafka. 

http://git-wip-us.apache.org/repos/asf/storm/blob/386fbba2/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
index 0236948..54e160f 100644
--- a/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
+++ b/external/sql/storm-sql-kafka/src/jvm/org/apache/storm/sql/kafka/KafkaDataSourcesProvider.java
@@ -120,9 +120,9 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     private final String topic;
     private final int primaryKeyIndex;
     private final List<String> fields;
-    private final Properties producerProperties;
+    private final String producerProperties;
     private KafkaTridentDataSource(TridentKafkaConfig conf, String topic, int primaryKeyIndex,
-                                   Properties producerProperties, List<String> fields) {
+                                   String producerProperties, List<String> fields) {
       this.conf = conf;
       this.topic = topic;
       this.primaryKeyIndex = primaryKeyIndex;
@@ -137,7 +137,22 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
 
     @Override
     public Function getConsumer() {
-      return new KafkaTridentSink(topic, primaryKeyIndex, producerProperties, fields);
+      Preconditions.checkNotNull(producerProperties,
+          "Writable Kafka Table " + topic + " must contain producer config");
+      Properties props = new Properties();
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        @SuppressWarnings("unchecked")
+        HashMap<String, Object> map = mapper.readValue(producerProperties, HashMap.class);
+        @SuppressWarnings("unchecked")
+        HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
+        props.putAll(producerConfig);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Preconditions.checkState(props.containsKey("bootstrap.servers"),
+          "Writable Kafka Table " + topic + " must contain \"bootstrap.servers\" config");
+      return new KafkaTridentSink(topic, primaryKeyIndex, props, fields);
     }
   }
 
@@ -172,19 +187,8 @@ public class KafkaDataSourcesProvider implements DataSourcesProvider {
     }
     Preconditions.checkState(primaryIndex != -1, "Kafka stream table must have a primary key");
     conf.scheme = new SchemeAsMultiScheme(new JsonScheme(fieldNames));
-    ObjectMapper mapper = new ObjectMapper();
-    Properties producerProp = new Properties();
-    try {
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> map = mapper.readValue(properties, HashMap.class);
-      @SuppressWarnings("unchecked")
-      HashMap<String, Object> producerConfig = (HashMap<String, Object>) map.get("producer");
-      Preconditions.checkNotNull(producerConfig, "Kafka Table must contain producer config");
-      producerProp.putAll(producerConfig);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return new KafkaTridentDataSource(conf, topic, primaryIndex, producerProp, fieldNames);
+
+    return new KafkaTridentDataSource(conf, topic, primaryIndex, properties, fieldNames);
   }
 
   private static Map<String, String> parseURIParams(String query) {


[3/3] storm git commit: add STORM-1459 to CHANGELOG

Posted by ka...@apache.org.
add STORM-1459 to CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c5e7b927
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c5e7b927
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c5e7b927

Branch: refs/heads/1.x-branch
Commit: c5e7b927aec2f8958e62468b2ea9b5830b649bda
Parents: c0029ed
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 2 09:32:22 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 2 09:32:22 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c5e7b927/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 296b603..4549c6a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-1459: Allow not specifying producer properties in read-only Kafka table in StormSQL
  * STORM-2052: Kafka Spout New Client API - Log Improvements and Parameter Tuning for Better Performance.
  * STORM-2050: [storm-sql] Support User Defined Aggregate Function for Trident mode
  * STORM-1434: Support the GROUP BY clause in StormSQL


[2/3] storm git commit: Merge branch 'STORM-1459-1.x' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-1459-1.x' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0029ed6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0029ed6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0029ed6

Branch: refs/heads/1.x-branch
Commit: c0029ed684897f93aa1845f47398c17cc76c01f7
Parents: 0eabff7 386fbba
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Sep 2 09:31:58 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Sep 2 09:31:58 2016 +0900

----------------------------------------------------------------------
 docs/storm-sql.md                               | 13 +++----
 .../sql/kafka/KafkaDataSourcesProvider.java     | 36 +++++++++++---------
 2 files changed, 27 insertions(+), 22 deletions(-)
----------------------------------------------------------------------