You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:21:56 UTC

[01/21] storm git commit: STORM-616: Jdbc connector for storm.

Repository: storm
Updated Branches:
  refs/heads/master d7334849b -> 64d7ac6b2


STORM-616: Jdbc connector for storm.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5b160168
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5b160168
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5b160168

Branch: refs/heads/master
Commit: 5b160168c75c0e8c4c402a5e24f606dab697fbef
Parents: 65e9f0c
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Jan 5 22:14:18 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Jan 5 22:22:46 2015 -0500

----------------------------------------------------------------------
 external/storm-jdbc/LICENSE                     | 202 ++++++++++++++++++
 external/storm-jdbc/README.md                   | 117 ++++++++++
 external/storm-jdbc/pom.xml                     | 120 +++++++++++
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  57 +++++
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    |  85 ++++++++
 .../org/apache/storm/jdbc/common/Column.java    |  81 +++++++
 .../apache/storm/jdbc/common/JDBCClient.java    | 211 +++++++++++++++++++
 .../java/org/apache/storm/jdbc/common/Util.java |  74 +++++++
 .../apache/storm/jdbc/mapper/JdbcMapper.java    |  33 +++
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  87 ++++++++
 .../storm/jdbc/trident/state/JdbcState.java     | 101 +++++++++
 .../jdbc/trident/state/JdbcStateFactory.java    |  40 ++++
 .../storm/jdbc/trident/state/JdbcUpdater.java   |  32 +++
 .../storm/jdbc/common/JdbcClientTest.java       |  86 ++++++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |  90 ++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  78 +++++++
 .../UserPersistanceTridentTopology.java         |  76 +++++++
 external/storm-jdbc/src/test/sql/test.sql       |   1 +
 18 files changed, 1571 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/LICENSE
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/LICENSE b/external/storm-jdbc/LICENSE
new file mode 100644
index 0000000..e06d208
--- /dev/null
+++ b/external/storm-jdbc/LICENSE
@@ -0,0 +1,202 @@
+Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "{}"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright {yyyy} {name of copyright owner}
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
new file mode 100644
index 0000000..36db3ef
--- /dev/null
+++ b/external/storm-jdbc/README.md
@@ -0,0 +1,117 @@
+#Storm HBase
+
+Storm/Trident integration for JDBC.
+
+## Usage
+The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper`
+interface:
+
+```java
+public interface JdbcMapper  extends Serializable {
+    List<Column> getColumns(ITuple tuple);
+}
+```
+
+The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
+
+### SimpleJdbcMapper
+`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
+tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in 
+the database table that you intend to write to.
+
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
+
+The following code creates a `SimpleJdbcMapper` instance that:
+
+1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
+2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
+automatically figure out the column names of the table that you intend to write to.
+
+```java
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+String tableName = "user_details";
+JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+### JdbcBolt
+To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition
+you must specify a configuration key that hold the hikari configuration map.
+
+ ```java
+Config config = new Config();
+config.put("jdbc.conf", hikariConfigMap);
+
+JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper)
+        .withConfigKey("jdbc.conf");
+ ```
+### JdbcTridentState
+We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
+state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example
+below:
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConfigKey("jdbc.conf")
+        .withMapper(jdbcMapper)
+        .withTableName("user");
+
+JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+```
+ 
+## Example: Persistent User details
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
+* Start the database and login to the database.
+* Create table user using the following query:
+
+```
+> use test;
+> create table user (id integer, user_name varchar(100), create_date date);
+```
+
+### Execution
+Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name]
+
+Mysql Example:
+```
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
+org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
+jdbc:mysql://localhost/test root password user UserPersistenceTopology
+```
+
+You can execute a select query against the user table which shoule show newly inserted rows:
+
+```
+select * from user;
+```
+
+For trident you can view `org.apache.storm.jdbc.topology.UserPersistanceTridentTopology`.
+## License
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+## Committer
+
+* Parth Brahmbhatt ([brahmbhatt.parth@gmail.com](mailto:brahmbhatt.parth@gmail.com))
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml
new file mode 100644
index 0000000..9130908
--- /dev/null
+++ b/external/storm-jdbc/pom.xml
@@ -0,0 +1,120 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.10.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-jdbc</artifactId>
+
+    <developers>
+        <developer>
+            <id>Parth-Brahmbhatt</id>
+            <name>Parth Brahmbhatt</name>
+            <email>brahmbhatt.parth@gmail.com</email>
+        </developer>
+    </developers>
+
+    <properties>
+        <hikari.version>2.2.5</hikari.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP-java6</artifactId>
+            <version>${hikari.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hsqldb</groupId>
+            <artifactId>hsqldb</artifactId>
+            <version>2.3.1</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>sql-maven-plugin</artifactId>
+                <version>1.5</version>
+                <dependencies>
+                    <dependency>
+                        <groupId>org.hsqldb</groupId>
+                        <artifactId>hsqldb</artifactId>
+                        <version>2.3.2</version>
+                    </dependency>
+                </dependencies>
+                <executions>
+                    <execution>
+                        <id>create-db</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>execute</goal>
+                        </goals>
+                        <configuration>
+                            <driver>org.hsqldb.jdbcDriver</driver>
+                            <url>jdbc:hsqldb:mem:test;shutdown=false</url>
+                            <username>SA</username>
+                            <password></password>
+                            <autocommit>true</autocommit>
+                            <srcFiles>
+                                <srcFile>src/test/sql/test.sql</srcFile>
+                            </srcFiles>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
new file mode 100644
index 0000000..8dacc2d
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public abstract class AbstractJdbcBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcBolt.class);
+
+    protected OutputCollector collector;
+
+    protected transient JDBCClient jdbcClient;
+    protected String tableName;
+    protected JdbcMapper mapper;
+    protected String configKey;
+
+    public AbstractJdbcBolt(String tableName, JdbcMapper mapper) {
+        Validate.notEmpty(tableName, "Table name can not be blank or null");
+        Validate.notNull(mapper, "mapper can not be null");
+        this.tableName = tableName;
+        this.mapper = mapper;
+    }
+
+    @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
+        this.collector = collector;
+
+        Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
+        Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
+
+        this.jdbcClient = new JDBCClient(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
new file mode 100644
index 0000000..e5df1ae
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * <p/>
+ * Note: Each JdbcBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcBolt extends AbstractJdbcBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
+
+    boolean writeToWAL = true;
+
+    public JdbcBolt(String tableName, JdbcMapper mapper) {
+        super(tableName, mapper);
+    }
+
+    public JdbcBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<Column> columns = mapper.getColumns(tuple);
+            List<List<Column>> columnLists = new ArrayList<List<Column>>();
+            columnLists.add(columns);
+            this.jdbcClient.insert(this.tableName, columnLists);
+        } catch (Exception e) {
+            LOG.warn("Failing tuple.", e);
+            this.collector.fail(tuple);
+            return;
+        }
+
+        this.collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
new file mode 100644
index 0000000..0346bf7
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class Column<T> {
+
+    private String columnName;
+    private T val;
+    private int sqlType;
+
+    public Column(String columnName, T val, int sqlType) {
+        this.columnName = columnName;
+        this.val = val;
+        this.sqlType = sqlType;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public T getVal() {
+        return val;
+    }
+
+    public int getSqlType() {
+        return sqlType;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof Column)) return false;
+
+        Column column = (Column) o;
+
+        if (sqlType != column.sqlType) return false;
+        if (!columnName.equals(column.columnName)) return false;
+        if (!val.equals(column.val)) return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = columnName.hashCode();
+        result = 31 * result + val.hashCode();
+        result = 31 * result + sqlType;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Column{" +
+                "columnName='" + columnName + '\'' +
+                ", val=" + val +
+                ", sqlType=" + sqlType +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
new file mode 100644
index 0000000..5b63d2d
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JDBCClient {
+    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
+
+    private HikariDataSource dataSource;
+
+    public JDBCClient(Map<String, Object> map) {
+        Properties properties = new Properties();
+        properties.putAll(map);
+        HikariConfig config = new HikariConfig(properties);
+        this.dataSource = new HikariDataSource(config);
+    }
+
+    public int insert(String tableName, List<List<Column>> columnLists) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            StringBuilder sb = new StringBuilder();
+            sb.append("Insert into ").append(tableName).append(" (");
+            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
+                @Override
+                public String apply(Column input) {
+                    return input.getColumnName();
+                }
+            });
+            String columns = Joiner.on(",").join(columnNames);
+            sb.append(columns).append(") values ( ");
+
+            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
+            sb.append(placeHolders).append(")");
+
+            String query = sb.toString();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Executing query " + query);
+            }
+
+            PreparedStatement preparedStatement = connection.prepareStatement(query);
+            for(List<Column> columnList : columnLists) {
+                setPreparedStatementParams(preparedStatement, columnList);
+            }
+
+            return preparedStatement.executeUpdate();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to insert in table " + tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
+        Connection connection = null;
+        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        try {
+            connection = this.dataSource.getConnection();
+            PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
+            setPreparedStatementParams(preparedStatement, queryParams);
+            ResultSet resultSet = preparedStatement.executeQuery();
+            List<List<Column>> rows = Lists.newArrayList();
+            while(resultSet.next()){
+                ResultSetMetaData metaData = resultSet.getMetaData();
+                int columnCount = metaData.getColumnCount();
+                List<Column> row = Lists.newArrayList();
+                for(int i=1 ; i <= columnCount; i++) {
+                    String columnLabel = metaData.getColumnLabel(i);
+                    int columnType = metaData.getColumnType(i);
+                    Object val = null;
+                    Class columnJavaType = Util.getJavaType(columnType);
+                    if (columnJavaType == String.class) {
+                        row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
+                    } else if (columnJavaType == Integer.class) {
+                        row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
+                    } else if (columnJavaType == Double.class) {
+                        row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
+                    } else if (columnJavaType == Float.class) {
+                        row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
+                    } else if (columnJavaType == Short.class) {
+                        row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
+                    } else if (columnJavaType == Boolean.class) {
+                        row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
+                    } else if (columnJavaType == byte[].class) {
+                        row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
+                    } else if (columnJavaType == Long.class) {
+                        row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
+                    } else if (columnJavaType == Date.class) {
+                        row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
+                    } else if (columnJavaType == Time.class) {
+                        row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
+                    } else if (columnJavaType == Timestamp.class) {
+                        row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
+                    } else {
+                        throw new RuntimeException("type =  " + columnType + " for column " + columnLabel + " not supported.");
+                    }
+                }
+                rows.add(row);
+            }
+            return rows;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute select query " + sqlQuery, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public Map<String, Integer> getColumnSchema(String tableName) {
+        Connection connection = null;
+        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        try {
+            connection = this.dataSource.getConnection();
+            DatabaseMetaData metaData = connection.getMetaData();
+            ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
+            while (resultSet.next()) {
+                columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"));
+            }
+            return columnSchemaMap;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to get schema for table " + tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public void executeSql(String sql) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute SQL", e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    private void setPreparedStatementParams(PreparedStatement preparedStatement, List<Column> columnList) throws SQLException {
+        int index = 1;
+        for (Column column : columnList) {
+            Class columnJavaType = Util.getJavaType(column.getSqlType());
+            if (column.getVal() == null) {
+                preparedStatement.setNull(index, column.getSqlType());
+            } else if (columnJavaType == String.class) {
+                preparedStatement.setString(index, (String) column.getVal());
+            } else if (columnJavaType == Integer.class) {
+                preparedStatement.setInt(index, (Integer) column.getVal());
+            } else if (columnJavaType == Double.class) {
+                preparedStatement.setDouble(index, (Double) column.getVal());
+            } else if (columnJavaType == Float.class) {
+                preparedStatement.setFloat(index, (Float) column.getVal());
+            } else if (columnJavaType == Short.class) {
+                preparedStatement.setShort(index, (Short) column.getVal());
+            } else if (columnJavaType == Boolean.class) {
+                preparedStatement.setBoolean(index, (Boolean) column.getVal());
+            } else if (columnJavaType == byte[].class) {
+                preparedStatement.setBytes(index, (byte[]) column.getVal());
+            } else if (columnJavaType == Long.class) {
+                preparedStatement.setLong(index, (Long) column.getVal());
+            } else if (columnJavaType == Date.class) {
+                preparedStatement.setDate(index, (Date) column.getVal());
+            } else if (columnJavaType == Time.class) {
+                preparedStatement.setTime(index, (Time) column.getVal());
+            } else if (columnJavaType == Timestamp.class) {
+                preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
+            } else {
+                throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());
+            }
+            ++index;
+        }
+    }
+
+    private void closeConnection(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to close connection", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
new file mode 100644
index 0000000..cc723c3
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import java.lang.reflect.Field;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.sql.Types;
+
+public class Util {
+    public static String getSqlTypeName(int sqlType) {
+        try {
+            for (Field field : Types.class.getFields()) {
+                if (sqlType == field.get(null)) {
+                    return field.getName();
+                }
+            }
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException("Could not get sqlTypeName ", e);
+        }
+        throw new RuntimeException("Unknown sqlType " + sqlType);
+    }
+
+    public static Class getJavaType(int sqlType) {
+        switch (sqlType) {
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGVARCHAR:
+                return String.class;
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+                return byte[].class;
+            case Types.BIT:
+                return Boolean.class;
+            case Types.TINYINT:
+            case Types.SMALLINT:
+                return Short.class;
+            case Types.INTEGER:
+                return Integer.class;
+            case Types.BIGINT:
+                return Long.class;
+            case Types.REAL:
+                return Float.class;
+            case Types.DOUBLE:
+            case Types.FLOAT:
+                return Double.class;
+            case Types.DATE:
+                return Date.class;
+            case Types.TIME:
+                return Time.class;
+            case Types.TIMESTAMP:
+                return Timestamp.class;
+            default:
+                throw new RuntimeException("We do not support tables with SqlType: " + getSqlTypeName(sqlType));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
new file mode 100644
index 0000000..c8c80bc
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcMapper.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.jdbc.common.Column;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface JdbcMapper extends Serializable {
+    /**
+     *
+     * @param tuple
+     * @return list of columns that represents one row in a DB table.
+     */
+    List<Column> getColumns(ITuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
new file mode 100644
index 0000000..7011a72
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.tuple.ITuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.common.Util;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class SimpleJdbcMapper implements JdbcMapper {
+
+    private Map<String, Integer> columnNameToType;
+
+    public SimpleJdbcMapper(String tableName, Map map) {
+        JDBCClient client = new JDBCClient(map);
+        this.columnNameToType = client.getColumnSchema(tableName);
+    }
+
+    @Override
+    public List<Column> getColumns(ITuple tuple) {
+        List<Column> columns = new ArrayList<Column>();
+        for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) {
+            String columnName = entry.getKey();
+            Integer columnSqlType = entry.getValue();
+
+            if(Util.getJavaType(columnSqlType).equals(String.class)) {
+                String value = tuple.getStringByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Short.class)) {
+                Short value = tuple.getShortByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Integer.class)) {
+                Integer value = tuple.getIntegerByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Long.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Double.class)) {
+                Double value = tuple.getDoubleByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Float.class)) {
+                Float value = tuple.getFloatByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Boolean.class)) {
+                Boolean value = tuple.getBooleanByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(byte[].class)) {
+                byte[] value = tuple.getBinaryByField(columnName);
+                columns.add(new Column(columnName, value, columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Date.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Date(value), columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Time.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Time(value), columnSqlType));
+            } else if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) {
+                Long value = tuple.getLongByField(columnName);
+                columns.add(new Column(columnName, new Timestamp(value), columnSqlType));
+            } else {
+                throw new RuntimeException("Unsupported java type in tuple " + Util.getJavaType(columnSqlType));
+            }
+        }
+        return columns;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
new file mode 100644
index 0000000..fec2ee4
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.topology.FailedException;
+import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.State;
+import storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcState implements State {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcState.class);
+
+    private Options options;
+    private JDBCClient jdbcClient;
+    private Map map;
+
+    protected JdbcState(Map map, int partitionIndex, int numPartitions, Options options) {
+        this.options = options;
+        this.map = map;
+    }
+
+    public static class Options implements Serializable {
+        private JdbcMapper mapper;
+        private String configKey;
+        private String tableName;
+
+        public Options withConfigKey(String configKey) {
+            this.configKey = configKey;
+            return this;
+        }
+
+        public Options withTableName(String tableName) {
+            this.tableName = tableName;
+            return this;
+        }
+
+        public Options withMapper(JdbcMapper mapper) {
+            this.mapper = mapper;
+            return this;
+        }
+    }
+
+    protected void prepare() {
+        Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
+        Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
+
+        this.jdbcClient = new JDBCClient(conf);
+    }
+
+    @Override
+    public void beginCommit(Long aLong) {
+        LOG.debug("beginCommit is noop.");
+    }
+
+    @Override
+    public void commit(Long aLong) {
+        LOG.debug("commit is noop.");
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        List<List<Column>> columnsLists = new ArrayList<List<Column>>();
+
+        for (TridentTuple tuple : tuples) {
+            columnsLists.add(options.mapper.getColumns(tuple));
+        }
+
+        try {
+            jdbcClient.insert(options.tableName, columnsLists);
+        } catch (Exception e) {
+            LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
new file mode 100644
index 0000000..a1bbdef
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcStateFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.task.IMetricsContext;
+import storm.trident.state.State;
+import storm.trident.state.StateFactory;
+
+import java.util.Map;
+
+public class JdbcStateFactory implements StateFactory {
+
+    private JdbcState.Options options;
+
+    public JdbcStateFactory(JdbcState.Options options) {
+        this.options = options;
+    }
+
+    @Override
+    public State makeState(Map map, IMetricsContext iMetricsContext, int partitionIndex, int numPartitions) {
+        JdbcState state = new JdbcState(map , partitionIndex, numPartitions, options);
+        state.prepare();
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
new file mode 100644
index 0000000..b76e230
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseStateUpdater;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class JdbcUpdater extends BaseStateUpdater<JdbcState>  {
+
+    @Override
+    public void updateState(JdbcState jdbcState, List<TridentTuple> tuples, TridentCollector collector) {
+        jdbcState.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
new file mode 100644
index 0000000..432d9f8
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.Date;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class JdbcClientTest {
+
+    private JDBCClient client;
+
+    private static final String tableName = "user_details";
+    @Before
+    public void setup() {
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName","org.hsqldb.jdbc.JDBCDataSource");//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
+        map.put("dataSource.user","SA");//root
+        map.put("dataSource.password","");//password
+
+        this.client = new JDBCClient(map);
+        client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
+    }
+
+    @Test
+    public void testInsertAndSelect() {
+        int id = 1;
+        String name = "bob";
+        Date createDate = new Date(System.currentTimeMillis());
+
+        List<Column> columns = Lists.newArrayList(
+                new Column("id",id, Types.INTEGER),
+                new Column("user_name",name, Types.VARCHAR),
+                new Column("create_date", createDate , Types.DATE)
+                );
+
+        List<List<Column>> columnList = new ArrayList<List<Column>>();
+        columnList.add(columns);
+        client.insert(tableName, columnList);
+
+        List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER)));
+        for(List<Column> row : rows) {
+            for(Column column : row) {
+                if(column.getColumnName().equalsIgnoreCase("id")) {
+                    Assert.assertEquals(id, column.getVal());
+                } else if(column.getColumnName().equalsIgnoreCase("user_name")) {
+                    Assert.assertEquals(name, column.getVal());
+                } else if(column.getColumnName().equalsIgnoreCase("create_date")) {
+                    Assert.assertEquals(createDate.toString(), column.getVal().toString());
+                } else {
+                    throw new AssertionError("Unknown column" + column);
+                }
+            }
+        }
+    }
+
+    @After
+    public void cleanup() {
+        client.executeSql("drop table " + tableName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
new file mode 100644
index 0000000..39fde59
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.spout;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
+
+import java.util.*;
+
+public class UserSpout implements IRichSpout {
+    boolean isDistributed;
+    SpoutOutputCollector collector;
+    public static final List<Values> rows = Lists.newArrayList(
+            new Values(1,"peter",System.currentTimeMillis()),
+            new Values(2,"bob",System.currentTimeMillis()),
+            new Values(3,"alice",System.currentTimeMillis()));
+
+    public UserSpout() {
+        this(true);
+    }
+
+    public UserSpout(boolean isDistributed) {
+        this.isDistributed = isDistributed;
+    }
+
+    public boolean isDistributed() {
+        return this.isDistributed;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        this.collector = collector;
+    }
+
+    public void close() {
+
+    }
+
+    public void nextTuple() {
+        final Random rand = new Random();
+        final Values row = rows.get(rand.nextInt(rows.size() - 1));
+        this.collector.emit(row);
+        Thread.yield();
+    }
+
+    public void ack(Object msgId) {
+
+    }
+
+    public void fail(Object msgId) {
+
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("id","user_name","create_date"));
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
new file mode 100644
index 0000000..21e4639
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.topology.TopologyBuilder;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.bolt.JdbcBolt;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+
+import java.util.Map;
+
+
+public class UserPersistanceTopology {
+    private static final String USER_SPOUT = "USER_SPOUT";
+    private static final String USER_BOLT = "USER_BOLT";
+
+    public static void main(String[] args) throws Exception {
+        if(args.length < 5) {
+            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user",args[2]);//root
+        map.put("dataSource.password",args[3]);//password
+        String tableName = args[4];//database table name
+        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+
+        Config config = new Config();
+
+        config.put("jdbc.conf", map);
+
+        UserSpout spout = new UserSpout();
+        JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper)
+                .withConfigKey("jdbc.conf");
+
+        // userSpout ==> jdbcBolt
+        TopologyBuilder builder = new TopologyBuilder();
+
+        builder.setSpout(USER_SPOUT, spout, 1);
+        builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT);
+
+        if (args.length == 5) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.createTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 6) {
+            StormSubmitter.submitTopology(args[6], config, builder.createTopology());
+        } else {
+            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
new file mode 100644
index 0000000..3b2ee66
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcState;
+import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
+import org.apache.storm.jdbc.trident.state.JdbcUpdater;
+import storm.trident.Stream;
+import storm.trident.TridentTopology;
+
+import java.util.Map;
+
+public class UserPersistanceTridentTopology {
+
+    public static void main(String[] args) throws Exception {
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user",args[2]);//root
+        map.put("dataSource.password",args[3]);//password
+        String tableName = args[4];//database table name
+        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+
+        Config config = new Config();
+
+        config.put("jdbc.conf", map);
+
+        TridentTopology topology = new TridentTopology();
+        Stream stream = topology.newStream("userSpout", new UserSpout());
+
+        JdbcState.Options options = new JdbcState.Options()
+                .withConfigKey("jdbc.conf")
+                .withMapper(jdbcMapper)
+                .withTableName("user");
+
+        JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
+        stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"),  new JdbcUpdater(), new Fields());
+        if (args.length == 5) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, topology.build());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else if (args.length == 6) {
+            StormSubmitter.submitTopology(args[6], config, topology.build());
+        } else {
+            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
+                    "<user> <password> <tableName> [topology name]");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/5b160168/external/storm-jdbc/src/test/sql/test.sql
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/sql/test.sql b/external/storm-jdbc/src/test/sql/test.sql
new file mode 100644
index 0000000..a402a68
--- /dev/null
+++ b/external/storm-jdbc/src/test/sql/test.sql
@@ -0,0 +1 @@
+create table user_details (id integer, user_name varchar(100), create_date date);
\ No newline at end of file


[18/21] storm git commit: STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.

Posted by sr...@apache.org.
STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04fccb1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04fccb1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04fccb1b

Branch: refs/heads/master
Commit: 04fccb1b152bdd3454adcfda8c4d71502ad2c6db
Parents: 017360b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 10 15:17:39 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Feb 18 12:21:21 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 13 ++---
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 10 +---
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  | 12 +----
 .../apache/storm/jdbc/common/JdbcClient.java    | 25 +++++++--
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |  2 +-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  4 +-
 .../storm/jdbc/common/JdbcClientTest.java       | 55 ++++++++++----------
 .../jdbc/topology/AbstractUserTopology.java     |  3 ++
 .../jdbc/topology/UserPersistanceTopology.java  |  8 +--
 9 files changed, 64 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index cfe449d..6fb1d41 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -63,9 +63,7 @@ to be <= topology.message.timeout.secs.
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf")
-                                    .withTableName("user_details")
-                                    .withJdbcMapper(simpleJdbcMapper)
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf","user_details",simpleJdbcMapper)
                                     .withQueryTimeoutSecs(30);
  ```
 ### JdbcTridentState
@@ -135,9 +133,9 @@ You can optionally specify a query timeout seconds param that specifies max seco
 The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
-JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
-        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
-        .withSelectSql("select user_name from user_details where user_id = ?")
+String selectSql = "select user_name from user_details where user_id = ?";
+SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
         .withQueryTimeoutSecs(30);
 ```
 
@@ -208,8 +206,7 @@ mvn clean compile assembly:single.
 
 Mysql Example:
 ```
-storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
-org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```
 
 You can execute a select query against the user table which should show newly inserted rows:

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 9abd553..f7be7ad 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -37,18 +37,10 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
     private String tableName;
     private JdbcMapper jdbcMapper;
 
-    public JdbcInsertBolt(String configKey) {
+    public JdbcInsertBolt(String configKey, String tableName, JdbcMapper jdbcMapper) {
         super(configKey);
-    }
-
-    public JdbcInsertBolt withTableName(String tableName) {
         this.tableName = tableName;
-        return this;
-    }
-
-    public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
         this.jdbcMapper = jdbcMapper;
-        return this;
     }
 
     public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 8232c2f..e1b1553 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,18 +37,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt(String configKey) {
+    public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
         super(configKey);
-    }
-
-    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
-        this.jdbcLookupMapper = jdbcLookupMapper;
-        return this;
-    }
-
-    public JdbcLookupBolt withSelectSql(String selectQuery) {
         this.selectQuery = selectQuery;
-        return this;
+        this.jdbcLookupMapper = jdbcLookupMapper;
     }
 
     public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 4992ed7..4ad108c 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -37,18 +37,23 @@ public class JdbcClient {
     private HikariDataSource dataSource;
     private int queryTimeoutSecs;
 
-    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
+    public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
         Properties properties = new Properties();
-        properties.putAll(map);
+        properties.putAll(hikariConfigMap);
         HikariConfig config = new HikariConfig(properties);
         this.dataSource = new HikariDataSource(config);
         this.queryTimeoutSecs = queryTimeoutSecs;
     }
 
-    public int insert(String tableName, List<List<Column>> columnLists) {
+    public void insert(String tableName, List<List<Column>> columnLists) {
         Connection connection = null;
         try {
             connection = this.dataSource.getConnection();
+            boolean autoCommit = connection.getAutoCommit();
+            if(autoCommit) {
+                connection.setAutoCommit(false);
+            }
+
             StringBuilder sb = new StringBuilder();
             sb.append("Insert into ").append(tableName).append(" (");
             Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
@@ -67,14 +72,24 @@ public class JdbcClient {
 
             LOG.debug("Executing query {}", query);
 
-
             PreparedStatement preparedStatement = connection.prepareStatement(query);
             preparedStatement.setQueryTimeout(queryTimeoutSecs);
             for(List<Column> columnList : columnLists) {
                 setPreparedStatementParams(preparedStatement, columnList);
+                preparedStatement.addBatch();
             }
 
-            return preparedStatement.executeUpdate();
+            int[] results = preparedStatement.executeBatch();
+            if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
+                connection.rollback();
+                throw new RuntimeException("failed at least one sql statement in the batch, operation rolled back.");
+            } else {
+                try {
+                    connection.commit();
+                } catch (SQLException e) {
+                    throw new RuntimeException("Failed to commit inserts in table " + tableName, e);
+                }
+            }
         } catch (SQLException e) {
             throw new RuntimeException("Failed to insert in table " + tableName, e);
         } finally {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
index 77852f4..f8c79a3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -10,7 +10,7 @@ import java.util.List;
 public interface JdbcLookupMapper extends JdbcMapper {
 
     /**
-     * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+     * Converts a DB row to a list of storm values that can be emitted. This is done to allow a single
      * storm input tuple and a single DB row to result in multiple output values.
      * @param input the input tuple.
      * @param columns list of columns that represents a row

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index ad7f1c0..841d5d6 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -33,9 +33,9 @@ public class SimpleJdbcMapper implements JdbcMapper {
 
     private List<Column> schemaColumns;
 
-    public SimpleJdbcMapper(String tableName, Map map) {
+    public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) {
         int queryTimeoutSecs = 30;
-        JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
+        JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 6423e8f..787b887 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -24,7 +24,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Date;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
@@ -45,39 +46,39 @@ public class JdbcClientTest {
 
         int queryTimeoutSecs = 60;
         this.client = new JdbcClient(map, queryTimeoutSecs);
-        client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
+        client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
     }
 
     @Test
     public void testInsertAndSelect() {
-        int id = 1;
-        String name = "bob";
-        Date createDate = new Date(System.currentTimeMillis());
+        int id1 = 1;
+        String name1 = "bob";
+        Timestamp createDate1 = new Timestamp(System.currentTimeMillis());
 
-        List<Column> columns = Lists.newArrayList(
-                new Column("id",id, Types.INTEGER),
-                new Column("user_name",name, Types.VARCHAR),
-                new Column("create_date", createDate , Types.DATE)
-                );
+        List<Column> row1 = Lists.newArrayList(
+                new Column("ID",id1, Types.INTEGER),
+                new Column("USER_NAME",name1, Types.VARCHAR),
+                new Column("CREATED_TIMESTAMP", createDate1 , Types.TIMESTAMP));
 
-        List<List<Column>> columnList = new ArrayList<List<Column>>();
-        columnList.add(columns);
-        client.insert(tableName, columnList);
+        int id2 = 2;
+        String name2 = "alice";
+        Timestamp createDate2 = new Timestamp(System.currentTimeMillis());
+        List<Column> row2 = Lists.newArrayList(
+                new Column("ID",id2, Types.INTEGER),
+                new Column("USER_NAME",name2, Types.VARCHAR),
+                new Column("CREATED_TIMESTAMP", createDate2 , Types.TIMESTAMP));
 
-        List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER)));
-        for(List<Column> row : rows) {
-            for(Column column : row) {
-                if(column.getColumnName().equalsIgnoreCase("id")) {
-                    Assert.assertEquals(id, column.getVal());
-                } else if(column.getColumnName().equalsIgnoreCase("user_name")) {
-                    Assert.assertEquals(name, column.getVal());
-                } else if(column.getColumnName().equalsIgnoreCase("create_date")) {
-                    Assert.assertEquals(createDate.toString(), column.getVal().toString());
-                } else {
-                    throw new AssertionError("Unknown column" + column);
-                }
-            }
-        }
+        List<List<Column>> rows = Lists.newArrayList(row1, row2);
+        client.insert(tableName, rows);
+
+        List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id1, Types.INTEGER)));
+        List<List<Column>> expectedRows = Lists.newArrayList();
+        expectedRows.add(row1);
+
+        Assert.assertEquals(expectedRows, selectedRows);
+
+        selectedRows = client.select("select * from user_details order by id", Lists.<Column>newArrayList());
+        Assert.assertEquals(rows, selectedRows);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 9cb0bfa..e94aca2 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -38,6 +38,9 @@ import java.util.Map;
 
 public abstract class AbstractUserTopology {
     private static final List<String> setupSqls = Lists.newArrayList(
+            "drop table if exists user",
+            "drop table if exists department",
+            "drop table if exists user_department",
             "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
             "create table if not exists department (dept_id integer, dept_name varchar(100))",
             "create table if not exists user_department (user_id integer, dept_id integer)",

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 32c012e..0b96f4d 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,12 +34,8 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
-                .withJdbcLookupMapper(this.jdbcLookupMapper)
-                .withSelectSql(SELECT_QUERY);
-        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF)
-                .withTableName(TABLE_NAME)
-                .withJdbcMapper(this.jdbcMapper);
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, TABLE_NAME, this.jdbcMapper);
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();


[14/21] storm git commit: Storm-616 : renamed the file to match the casing of class name.

Posted by sr...@apache.org.
Storm-616 : renamed the file to match the casing of class name.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/93bb2337
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/93bb2337
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/93bb2337

Branch: refs/heads/master
Commit: 93bb2337cee4ab7c010a7ec46246dc2df219743e
Parents: d00d18e
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 21 14:20:19 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 21 14:20:19 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/jdbc/common/JDBCClient.java    | 213 -------------------
 .../apache/storm/jdbc/common/JdbcClient.java    | 213 +++++++++++++++++++
 2 files changed, 213 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/93bb2337/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
deleted file mode 100644
index d11d1b3..0000000
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.common;
-
-import com.google.common.base.Function;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.*;
-import java.sql.Date;
-import java.util.*;
-
-public class JdbcClient {
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
-
-    private HikariDataSource dataSource;
-    private int queryTimeoutSecs;
-
-    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
-        Properties properties = new Properties();
-        properties.putAll(map);
-        HikariConfig config = new HikariConfig(properties);
-        this.dataSource = new HikariDataSource(config);
-        this.queryTimeoutSecs = queryTimeoutSecs;
-    }
-
-    public int insert(String tableName, List<List<Column>> columnLists) {
-        Connection connection = null;
-        try {
-            connection = this.dataSource.getConnection();
-            StringBuilder sb = new StringBuilder();
-            sb.append("Insert into ").append(tableName).append(" (");
-            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
-                @Override
-                public String apply(Column input) {
-                    return input.getColumnName();
-                }
-            });
-            String columns = Joiner.on(",").join(columnNames);
-            sb.append(columns).append(") values ( ");
-
-            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
-            sb.append(placeHolders).append(")");
-
-            String query = sb.toString();
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Executing query " + query);
-            }
-
-            PreparedStatement preparedStatement = connection.prepareStatement(query);
-            preparedStatement.setQueryTimeout(queryTimeoutSecs);
-            for(List<Column> columnList : columnLists) {
-                setPreparedStatementParams(preparedStatement, columnList);
-            }
-
-            return preparedStatement.executeUpdate();
-        } catch (SQLException e) {
-            throw new RuntimeException("Failed to insert in table " + tableName, e);
-        } finally {
-            closeConnection(connection);
-        }
-    }
-
-    public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
-        Connection connection = null;
-        try {
-            connection = this.dataSource.getConnection();
-            PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
-            preparedStatement.setQueryTimeout(queryTimeoutSecs);
-            setPreparedStatementParams(preparedStatement, queryParams);
-            ResultSet resultSet = preparedStatement.executeQuery();
-            List<List<Column>> rows = Lists.newArrayList();
-            while(resultSet.next()){
-                ResultSetMetaData metaData = resultSet.getMetaData();
-                int columnCount = metaData.getColumnCount();
-                List<Column> row = Lists.newArrayList();
-                for(int i=1 ; i <= columnCount; i++) {
-                    String columnLabel = metaData.getColumnLabel(i);
-                    int columnType = metaData.getColumnType(i);
-                    Class columnJavaType = Util.getJavaType(columnType);
-                    if (columnJavaType.equals(String.class)) {
-                        row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Integer.class)) {
-                        row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Double.class)) {
-                        row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Float.class)) {
-                        row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Short.class)) {
-                        row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Boolean.class)) {
-                        row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
-                    } else if (columnJavaType.equals(byte[].class)) {
-                        row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Long.class)) {
-                        row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Date.class)) {
-                        row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Time.class)) {
-                        row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
-                    } else if (columnJavaType.equals(Timestamp.class)) {
-                        row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
-                    } else {
-                        throw new RuntimeException("type =  " + columnType + " for column " + columnLabel + " not supported.");
-                    }
-                }
-                rows.add(row);
-            }
-            return rows;
-        } catch (SQLException e) {
-            throw new RuntimeException("Failed to execute select query " + sqlQuery, e);
-        } finally {
-            closeConnection(connection);
-        }
-    }
-
-    public List<Column> getColumnSchema(String tableName) {
-        Connection connection = null;
-        List<Column> columns = new ArrayList<Column>();
-        try {
-            connection = this.dataSource.getConnection();
-            DatabaseMetaData metaData = connection.getMetaData();
-            ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
-            while (resultSet.next()) {
-                columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")));
-            }
-            return columns;
-        } catch (SQLException e) {
-            throw new RuntimeException("Failed to get schema for table " + tableName, e);
-        } finally {
-            closeConnection(connection);
-        }
-    }
-
-    public void executeSql(String sql) {
-        Connection connection = null;
-        try {
-            connection = this.dataSource.getConnection();
-            Statement statement = connection.createStatement();
-            statement.execute(sql);
-        } catch (SQLException e) {
-            throw new RuntimeException("Failed to execute SQL", e);
-        } finally {
-            closeConnection(connection);
-        }
-    }
-
-    private void setPreparedStatementParams(PreparedStatement preparedStatement, List<Column> columnList) throws SQLException {
-        int index = 1;
-        for (Column column : columnList) {
-            Class columnJavaType = Util.getJavaType(column.getSqlType());
-            if (column.getVal() == null) {
-                preparedStatement.setNull(index, column.getSqlType());
-            } else if (columnJavaType.equals(String.class)) {
-                preparedStatement.setString(index, (String) column.getVal());
-            } else if (columnJavaType.equals(Integer.class)) {
-                preparedStatement.setInt(index, (Integer) column.getVal());
-            } else if (columnJavaType.equals(Double.class)) {
-                preparedStatement.setDouble(index, (Double) column.getVal());
-            } else if (columnJavaType.equals(Float.class)) {
-                preparedStatement.setFloat(index, (Float) column.getVal());
-            } else if (columnJavaType.equals(Short.class)) {
-                preparedStatement.setShort(index, (Short) column.getVal());
-            } else if (columnJavaType.equals(Boolean.class)) {
-                preparedStatement.setBoolean(index, (Boolean) column.getVal());
-            } else if (columnJavaType.equals(byte[].class)) {
-                preparedStatement.setBytes(index, (byte[]) column.getVal());
-            } else if (columnJavaType.equals(Long.class)) {
-                preparedStatement.setLong(index, (Long) column.getVal());
-            } else if (columnJavaType.equals(Date.class)) {
-                preparedStatement.setDate(index, (Date) column.getVal());
-            } else if (columnJavaType.equals(Time.class)) {
-                preparedStatement.setTime(index, (Time) column.getVal());
-            } else if (columnJavaType.equals(Timestamp.class)) {
-                preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
-            } else {
-                throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());
-            }
-            ++index;
-        }
-    }
-
-    private void closeConnection(Connection connection) {
-        if (connection != null) {
-            try {
-                connection.close();
-            } catch (SQLException e) {
-                throw new RuntimeException("Failed to close connection", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/93bb2337/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
new file mode 100644
index 0000000..d11d1b3
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.common;
+
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Date;
+import java.util.*;
+
+public class JdbcClient {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
+
+    private HikariDataSource dataSource;
+    private int queryTimeoutSecs;
+
+    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
+        Properties properties = new Properties();
+        properties.putAll(map);
+        HikariConfig config = new HikariConfig(properties);
+        this.dataSource = new HikariDataSource(config);
+        this.queryTimeoutSecs = queryTimeoutSecs;
+    }
+
+    public int insert(String tableName, List<List<Column>> columnLists) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            StringBuilder sb = new StringBuilder();
+            sb.append("Insert into ").append(tableName).append(" (");
+            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
+                @Override
+                public String apply(Column input) {
+                    return input.getColumnName();
+                }
+            });
+            String columns = Joiner.on(",").join(columnNames);
+            sb.append(columns).append(") values ( ");
+
+            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
+            sb.append(placeHolders).append(")");
+
+            String query = sb.toString();
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Executing query " + query);
+            }
+
+            PreparedStatement preparedStatement = connection.prepareStatement(query);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            for(List<Column> columnList : columnLists) {
+                setPreparedStatementParams(preparedStatement, columnList);
+            }
+
+            return preparedStatement.executeUpdate();
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to insert in table " + tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            setPreparedStatementParams(preparedStatement, queryParams);
+            ResultSet resultSet = preparedStatement.executeQuery();
+            List<List<Column>> rows = Lists.newArrayList();
+            while(resultSet.next()){
+                ResultSetMetaData metaData = resultSet.getMetaData();
+                int columnCount = metaData.getColumnCount();
+                List<Column> row = Lists.newArrayList();
+                for(int i=1 ; i <= columnCount; i++) {
+                    String columnLabel = metaData.getColumnLabel(i);
+                    int columnType = metaData.getColumnType(i);
+                    Class columnJavaType = Util.getJavaType(columnType);
+                    if (columnJavaType.equals(String.class)) {
+                        row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Integer.class)) {
+                        row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Double.class)) {
+                        row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Float.class)) {
+                        row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Short.class)) {
+                        row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Boolean.class)) {
+                        row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
+                    } else if (columnJavaType.equals(byte[].class)) {
+                        row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Long.class)) {
+                        row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Date.class)) {
+                        row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Time.class)) {
+                        row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
+                    } else if (columnJavaType.equals(Timestamp.class)) {
+                        row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
+                    } else {
+                        throw new RuntimeException("type =  " + columnType + " for column " + columnLabel + " not supported.");
+                    }
+                }
+                rows.add(row);
+            }
+            return rows;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute select query " + sqlQuery, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public List<Column> getColumnSchema(String tableName) {
+        Connection connection = null;
+        List<Column> columns = new ArrayList<Column>();
+        try {
+            connection = this.dataSource.getConnection();
+            DatabaseMetaData metaData = connection.getMetaData();
+            ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
+            while (resultSet.next()) {
+                columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")));
+            }
+            return columns;
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to get schema for table " + tableName, e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    public void executeSql(String sql) {
+        Connection connection = null;
+        try {
+            connection = this.dataSource.getConnection();
+            Statement statement = connection.createStatement();
+            statement.execute(sql);
+        } catch (SQLException e) {
+            throw new RuntimeException("Failed to execute SQL", e);
+        } finally {
+            closeConnection(connection);
+        }
+    }
+
+    private void setPreparedStatementParams(PreparedStatement preparedStatement, List<Column> columnList) throws SQLException {
+        int index = 1;
+        for (Column column : columnList) {
+            Class columnJavaType = Util.getJavaType(column.getSqlType());
+            if (column.getVal() == null) {
+                preparedStatement.setNull(index, column.getSqlType());
+            } else if (columnJavaType.equals(String.class)) {
+                preparedStatement.setString(index, (String) column.getVal());
+            } else if (columnJavaType.equals(Integer.class)) {
+                preparedStatement.setInt(index, (Integer) column.getVal());
+            } else if (columnJavaType.equals(Double.class)) {
+                preparedStatement.setDouble(index, (Double) column.getVal());
+            } else if (columnJavaType.equals(Float.class)) {
+                preparedStatement.setFloat(index, (Float) column.getVal());
+            } else if (columnJavaType.equals(Short.class)) {
+                preparedStatement.setShort(index, (Short) column.getVal());
+            } else if (columnJavaType.equals(Boolean.class)) {
+                preparedStatement.setBoolean(index, (Boolean) column.getVal());
+            } else if (columnJavaType.equals(byte[].class)) {
+                preparedStatement.setBytes(index, (byte[]) column.getVal());
+            } else if (columnJavaType.equals(Long.class)) {
+                preparedStatement.setLong(index, (Long) column.getVal());
+            } else if (columnJavaType.equals(Date.class)) {
+                preparedStatement.setDate(index, (Date) column.getVal());
+            } else if (columnJavaType.equals(Time.class)) {
+                preparedStatement.setTime(index, (Time) column.getVal());
+            } else if (columnJavaType.equals(Timestamp.class)) {
+                preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
+            } else {
+                throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());
+            }
+            ++index;
+        }
+    }
+
+    private void closeConnection(Connection connection) {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                throw new RuntimeException("Failed to close connection", e);
+            }
+        }
+    }
+}


[03/21] storm git commit: STORM-616 : removing unintended changes.

Posted by sr...@apache.org.
STORM-616 : removing unintended changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d260759a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d260759a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d260759a

Branch: refs/heads/master
Commit: d260759ac203383e27668a7cb7090926029f7406
Parents: ab9f778
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Jan 5 22:31:05 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Jan 5 22:31:05 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 +++----
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 +----
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +---------
 3 files changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 3165189..918da74 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,11 +180,10 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
+                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]";
-                LOG.warn(msg);
-                throw new UpdateOffsetException(msg);
+                        "configured start offset time: [" + config.startOffsetTime + "]");
+                throw new UpdateOffsetException();
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 5c366ec..1be7312 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,9 +17,6 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends FailedFetchException {
+public class UpdateOffsetException extends RuntimeException {
 
-    public UpdateOffsetException(String message) {
-        super(message);
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d260759a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 34566c5..94bf134 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,7 +33,6 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
-import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -130,14 +129,7 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        } catch (UpdateOffsetException e) {
-            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
-            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
-            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
-        }
+        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);


[17/21] storm git commit: Merge remote-tracking branch 'upstream/master' into storm-616

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into storm-616


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/017360bd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/017360bd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/017360bd

Branch: refs/heads/master
Commit: 017360bdeeccd907d86b36e686f1379d2907ce97
Parents: e157edb 8491151
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 3 15:22:35 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Feb 3 15:22:35 2015 -0800

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 CHANGELOG.md                                    |  21 +-
 README.markdown                                 |   6 +
 SECURITY.md                                     |  14 +-
 STORM-UI-REST-API.md                            |  21 +
 bin/storm                                       |  36 +-
 bin/storm-config.cmd                            |  32 +-
 bin/storm.cmd                                   |  15 +-
 conf/defaults.yaml                              |   3 +-
 docs/README.md                                  |   9 +
 docs/documentation/Contributing-to-Storm.md     |   2 +-
 docs/documentation/Home.md                      |   2 +-
 docs/documentation/Powered-By.md                |  15 +-
 docs/downloads.html                             |   3 +
 .../hbase/trident/state/HBaseMapState.java      |   2 +-
 .../storm/hbase/trident/state/HBaseState.java   |   2 +-
 external/storm-kafka/README.md                  |  24 +-
 external/storm-kafka/pom.xml                    |  21 +-
 .../ExponentialBackoffMsgRetryManager.java      | 167 ++++
 .../jvm/storm/kafka/FailedMsgRetryManager.java  |  26 +
 .../src/jvm/storm/kafka/PartitionManager.java   |  42 +-
 .../src/jvm/storm/kafka/SpoutConfig.java        |   8 +
 .../ExponentialBackoffMsgRetryManagerTest.java  | 194 ++++
 pom.xml                                         |  19 +-
 storm-buildtools/storm-maven-plugins/pom.xml    |  81 ++
 .../storm/maven/plugin/util/CommandExec.java    |  89 ++
 .../plugin/versioninfo/VersionInfoMojo.java     | 304 ++++++
 storm-core/pom.xml                              |  41 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |  38 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  37 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   5 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   6 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  49 +-
 storm-core/src/clj/backtype/storm/util.clj      |  11 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   6 +
 .../storm/generated/GetInfoOptions.java         | 350 +++++++
 .../jvm/backtype/storm/generated/Nimbus.java    | 974 +++++++++++++++++++
 .../storm/generated/NumErrorsChoice.java        |  64 ++
 .../storm/grouping/PartialKeyGrouping.java      |  57 ++
 .../backtype/storm/messaging/netty/Server.java  |   2 +
 .../src/jvm/backtype/storm/utils/Monitor.java   |   8 +-
 .../jvm/backtype/storm/utils/VersionInfo.java   | 131 +++
 storm-core/src/py/storm/DistributedRPC-remote   |   0
 .../py/storm/DistributedRPCInvocations-remote   |   0
 storm-core/src/py/storm/Nimbus-remote           |   7 +
 storm-core/src/py/storm/Nimbus.py               | 226 +++++
 storm-core/src/py/storm/ttypes.py               |  80 ++
 .../storm-core-version-info.properties          |  24 +
 storm-core/src/storm.thrift                     |  11 +
 .../templates/topology-page-template.html       |   8 +-
 .../clj/backtype/storm/integration_test.clj     |  10 +-
 .../scheduler/multitenant_scheduler_test.clj    |   2 +
 .../test/clj/backtype/storm/supervisor_test.clj |  34 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |  46 +
 storm-dist/binary/src/main/assembly/binary.xml  |   7 -
 56 files changed, 3249 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/017360bd/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/017360bd/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------


[13/21] storm git commit: Storm-616: defaulting query time out to topology message timeout seconds.

Posted by sr...@apache.org.
Storm-616: defaulting query time out to topology message timeout seconds.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d00d18e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d00d18e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d00d18e2

Branch: refs/heads/master
Commit: d00d18e24ee8287803a9508e46f4028222edff84
Parents: 2d6c5ed
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 15:16:11 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 15:16:11 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                                 | 6 +++---
 .../java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java     | 7 ++++++-
 .../java/org/apache/storm/jdbc/trident/state/JdbcState.java   | 7 ++++++-
 3 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 948ba23..1139450 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -57,8 +57,8 @@ List<Column> columnSchema = Lists.newArrayList(
 To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
 the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
-max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs. 
-You should set this value to be <= topology.message.timeout.secs.
+max seconds an insert query can take. The default is set to value of topology.message.timeout.secs.You should set this value 
+to be <= topology.message.timeout.secs.
 
  ```java
 Config config = new Config();
@@ -133,7 +133,7 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
 To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
 You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
-The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
+The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
 JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")

http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 4b93d4d..436ad00 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.jdbc.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
@@ -34,7 +35,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
     protected transient JdbcClient jdbcClient;
     protected String configKey;
-    protected int queryTimeoutSecs = 30;
+    protected Integer queryTimeoutSecs;
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
@@ -43,6 +44,10 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
         Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
 
+        if(queryTimeoutSecs == null) {
+            queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+        }
+
         this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 48fde4e..01da5cd 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.jdbc.trident.state;
 
+import backtype.storm.Config;
 import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
@@ -55,7 +56,7 @@ public class JdbcState implements State {
         private String configKey;
         private String tableName;
         private String selectQuery;
-        private int queryTimeoutSecs = 30;
+        private Integer queryTimeoutSecs;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -92,6 +93,10 @@ public class JdbcState implements State {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
+        if(options.queryTimeoutSecs == null) {
+            options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+        }
+
         this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
     }
 


[15/21] storm git commit: STORM-616: added Harsha as a committer sponsor.

Posted by sr...@apache.org.
STORM-616: added Harsha as a committer sponsor.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bb927c86
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bb927c86
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bb927c86

Branch: refs/heads/master
Commit: bb927c865b92126caaa874550f478501086ab08d
Parents: 93bb233
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 21 14:40:56 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 21 14:40:56 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bb927c86/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 1139450..190fb98 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -211,7 +211,6 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 
-## Committer
-
+## Committer Sponsors
 * Parth Brahmbhatt ([brahmbhatt.parth@gmail.com](mailto:brahmbhatt.parth@gmail.com))
- 
\ No newline at end of file
+* Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) 
\ No newline at end of file


[09/21] storm git commit: Storm-616: removing duplicate license and fixing typos.

Posted by sr...@apache.org.
Storm-616: removing duplicate license and fixing typos.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/396bc619
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/396bc619
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/396bc619

Branch: refs/heads/master
Commit: 396bc61992e1c41407b9ed92c02260d467a5ea8e
Parents: ee37870
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 11:21:26 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 11:21:26 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                      |  2 +-
 .../java/org/apache/storm/jdbc/bolt/JdbcBolt.java  | 17 -----------------
 2 files changed, 1 insertion(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/396bc619/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index bb43687..79a0d61 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -4,7 +4,7 @@ to either insert storm tuples in a database table or to execute select queries a
 in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
 
 ## Inserting into a database.
-The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
+The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.
 The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
 
 ```java

http://git-wip-us.apache.org/repos/asf/storm/blob/396bc619/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index d4ddfcb..4c63a09 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -16,23 +16,6 @@
  * limitations under the License.
  */
 package org.apache.storm.jdbc.bolt;
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import org.apache.storm.jdbc.common.Column;


[04/21] storm git commit: STORM-616:Adding storm-jdbc as external module in pom. Adding links to hikariCP configuration in README.

Posted by sr...@apache.org.
STORM-616:Adding storm-jdbc as external module in pom. Adding links to hikariCP configuration in README.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/079deda4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/079deda4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/079deda4

Branch: refs/heads/master
Commit: 079deda496d16bd896611da706402c9df7e2319f
Parents: d260759
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jan 6 12:47:40 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Jan 6 12:47:40 2015 -0500

----------------------------------------------------------------------
 external/storm-jdbc/README.md | 3 ++-
 external/storm-jdbc/pom.xml   | 5 +++++
 pom.xml                       | 1 +
 3 files changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/079deda4/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 36db3ef..a0273f2 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -25,7 +25,8 @@ The following code creates a `SimpleJdbcMapper` instance that:
 
 1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
 2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
-automatically figure out the column names of the table that you intend to write to.
+automatically figure out the column names of the table that you intend to write to. 
+Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to lear more about hikari configuration properties.
 
 ```java
 Map hikariConfigMap = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/storm/blob/079deda4/external/storm-jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/pom.xml b/external/storm-jdbc/pom.xml
index 9130908..894dd54 100644
--- a/external/storm-jdbc/pom.xml
+++ b/external/storm-jdbc/pom.xml
@@ -52,6 +52,11 @@
             <version>3.3</version>
         </dependency>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>17.0</version>
+        </dependency>
+        <dependency>
             <groupId>com.zaxxer</groupId>
             <artifactId>HikariCP-java6</artifactId>
             <version>${hikari.version}</version>

http://git-wip-us.apache.org/repos/asf/storm/blob/079deda4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5645b9a..8604dab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,7 @@
         <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hbase</module>
+        <module>external/storm-jdbc</module>
     </modules>
 
     <scm>


[08/21] storm git commit: Storm-166: adding jdbcLookupBolt.

Posted by sr...@apache.org.
Storm-166: adding jdbcLookupBolt.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ee37870f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ee37870f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ee37870f

Branch: refs/heads/master
Commit: ee37870f4f8ba02ce9081d93164418aba0835aa3
Parents: 8bfa602
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 08:29:53 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 08:29:53 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  | 80 ++++++++++++++++++++
 1 file changed, 80 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ee37870f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
new file mode 100644
index 0000000..2e4e4e6
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Basic bolt for querying from any database.
+ */
+public class JdbcLookupBolt extends AbstractJdbcBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
+
+    private String selectQuery;
+
+    private JdbcLookupMapper jdbcLookupMapper;
+
+    public JdbcLookupBolt(String configKey) {
+        super(configKey);
+    }
+
+    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
+        this.jdbcLookupMapper = jdbcLookupMapper;
+        return this;
+    }
+
+    public JdbcLookupBolt withSelectSql(String selectQuery) {
+        this.selectQuery = selectQuery;
+        return this;
+    }
+
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
+            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
+
+            if (result != null && result.size() != 0) {
+                for (List<Column> row : result) {
+                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
+                    for (Values value : values) {
+                        collector.emit(value);
+                    }
+                }
+            }
+            this.collector.ack(tuple);
+        } catch (Exception e) {
+            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
+            this.collector.fail(tuple);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+        jdbcLookupMapper.declareOutputFields(outputFieldsDeclarer);
+    }
+}


[07/21] storm git commit: STORM-616: adding jdbc Lookup bolt.

Posted by sr...@apache.org.
STORM-616: adding jdbc Lookup bolt.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bfa6028
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bfa6028
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bfa6028

Branch: refs/heads/master
Commit: 8bfa602876f84985864f136ee578bd2d9edb9ba7
Parents: cd96dd0
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 7 17:57:35 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 7 18:11:49 2015 -0500

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 148 +++++++++++++++----
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  10 +-
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    |  18 ++-
 .../org/apache/storm/jdbc/common/Column.java    |   8 +-
 .../apache/storm/jdbc/common/JDBCClient.java    |  54 ++++---
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |  26 ++++
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  46 ++++++
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  14 +-
 .../storm/jdbc/trident/state/JdbcQuery.java     |  40 +++++
 .../storm/jdbc/trident/state/JdbcState.java     |  33 +++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |   2 +-
 .../jdbc/topology/AbstractUserTopology.java     | 102 +++++++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  64 +++-----
 .../UserPersistanceTridentTopology.java         |  61 +++-----
 14 files changed, 466 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index a0273f2..bb43687 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -1,10 +1,11 @@
-#Storm HBase
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
 
-Storm/Trident integration for JDBC.
-
-## Usage
-The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper`
-interface:
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
 
 ```java
 public interface JdbcMapper  extends Serializable {
@@ -16,7 +17,7 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
 
 ### SimpleJdbcMapper
 `storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
-tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in 
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
 the database table that you intend to write to.
 
 To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
@@ -25,8 +26,8 @@ The following code creates a `SimpleJdbcMapper` instance that:
 
 1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
 2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
-automatically figure out the column names of the table that you intend to write to. 
-Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to lear more about hikari configuration properties.
+automatically figure out the column names and corresponding data types of the table that you intend to write to. 
+Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
 
 ```java
 Map hikariConfigMap = Maps.newHashMap();
@@ -35,49 +36,138 @@ hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
 hikariConfigMap.put("dataSource.user","root");
 hikariConfigMap.put("dataSource.password","password");
 String tableName = "user_details";
-JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
+If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
+and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
+`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR));
+    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
+
 ### JdbcBolt
-To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition
-you must specify a configuration key that hold the hikari configuration map.
+To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
+the rows will be inserted.
 
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-
-JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper)
-        .withConfigKey("jdbc.conf");
+JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+                                    .withTableName("user_details")
+                                    .withJdbcMapper(simpleJdbcMapper);
  ```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
-state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example
-below:
+state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
+hikari configuration map. See the example below:
 
 ```java
 JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withMapper(jdbcMapper)
-        .withTableName("user");
+        .withTableName("user_details");
 
 JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
 ```
- 
-## Example: Persistent User details
-A runnable example can be found in the `src/test/java/topology` directory.
 
-### Setup
-* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
-* Start the database and login to the database.
-* Create table user using the following query:
+## Lookup from Database
+We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
+executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
 
+```java
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+    List<Column> getColumns(ITuple tuple);
+    public List<Values> toTuple(ITuple input, List<Column> columns);
 ```
-> use test;
-> create table user (id integer, user_name varchar(100), create_date date);
+
+The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
+tuple. 
+The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
+user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
+The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
+second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
+returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
+query, the second item to second `?` in query and so on. 
+The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
+and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
+of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
+
+###SimpleJdbcLookupMapper
+`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
+
+To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
+that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
+`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
+select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
+and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
+So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
+`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
+will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
+For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
+is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
+
+```java
+Fields outputFields = new Fields("user_id", "user_name", "create_date");
+List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
 ```
 
+### JdbcLookupBolt
+To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+
+```java
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
+        .withSelectSql("select user_name from user_details where user_id = ?")
+```
+
+### JdbcTridentState for lookup
+We also support a trident query state that can be used with trident topologies. 
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConfigKey("jdbc.conf")
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+        .withSelectQuery("select user_name from user_details where user_id = ?");
+```
+
+## Example:
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
+* The test topologies executes the following queries so your intended DB must support these queries for test topologies
+to work. 
+```SQL
+create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, 'R&D');
+insert into department values (2, 'Finance');
+insert into department values (3, 'HR');
+insert into department values (4, 'Sales');
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
+```
 ### Execution
 Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
-storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name]
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
 
 Mysql Example:
 ```
@@ -86,7 +176,7 @@ org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jd
 jdbc:mysql://localhost/test root password user UserPersistenceTopology
 ```
 
-You can execute a select query against the user table which shoule show newly inserted rows:
+You can execute a select query against the user table which should show newly inserted rows:
 
 ```
 select * from user;

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 8dacc2d..1e717eb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,15 +34,11 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
     protected OutputCollector collector;
 
     protected transient JDBCClient jdbcClient;
-    protected String tableName;
-    protected JdbcMapper mapper;
     protected String configKey;
 
-    public AbstractJdbcBolt(String tableName, JdbcMapper mapper) {
-        Validate.notEmpty(tableName, "Table name can not be blank or null");
-        Validate.notNull(mapper, "mapper can not be null");
-        this.tableName = tableName;
-        this.mapper = mapper;
+    public AbstractJdbcBolt(String configKey) {
+        Validate.notEmpty(configKey, "configKey can not be null");
+        this.configKey = configKey;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index e5df1ae..d4ddfcb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -51,21 +51,27 @@ import java.util.List;
 public class JdbcBolt extends AbstractJdbcBolt {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
 
-    boolean writeToWAL = true;
+    private String tableName;
+    private JdbcMapper jdbcMapper;
 
-    public JdbcBolt(String tableName, JdbcMapper mapper) {
-        super(tableName, mapper);
+    public JdbcBolt(String configKey) {
+        super(configKey);
     }
 
-    public JdbcBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
+    public JdbcBolt withTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+        this.jdbcMapper = jdbcMapper;
         return this;
     }
 
     @Override
     public void execute(Tuple tuple) {
         try {
-            List<Column> columns = mapper.getColumns(tuple);
+            List<Column> columns = jdbcMapper.getColumns(tuple);
             List<List<Column>> columnLists = new ArrayList<List<Column>>();
             columnLists.add(columns);
             this.jdbcClient.insert(this.tableName, columnLists);

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index 0346bf7..4c5b37d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -18,13 +18,14 @@
 package org.apache.storm.jdbc.common;
 
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 
-public class Column<T> {
+public class Column<T> implements Serializable {
 
     private String columnName;
     private T val;
@@ -36,6 +37,11 @@ public class Column<T> {
         this.sqlType = sqlType;
     }
 
+    public Column(String columnName, int sqlType) {
+        this.columnName = columnName;
+        this.sqlType = sqlType;
+    }
+
     public String getColumnName() {
         return columnName;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index 5b63d2d..410c884 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -81,7 +81,6 @@ public class JDBCClient {
 
     public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
         Connection connection = null;
-        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
         try {
             connection = this.dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
@@ -95,29 +94,28 @@ public class JDBCClient {
                 for(int i=1 ; i <= columnCount; i++) {
                     String columnLabel = metaData.getColumnLabel(i);
                     int columnType = metaData.getColumnType(i);
-                    Object val = null;
                     Class columnJavaType = Util.getJavaType(columnType);
-                    if (columnJavaType == String.class) {
+                    if (columnJavaType.equals(String.class)) {
                         row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
-                    } else if (columnJavaType == Integer.class) {
+                    } else if (columnJavaType.equals(Integer.class)) {
                         row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
-                    } else if (columnJavaType == Double.class) {
+                    } else if (columnJavaType.equals(Double.class)) {
                         row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
-                    } else if (columnJavaType == Float.class) {
+                    } else if (columnJavaType.equals(Float.class)) {
                         row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
-                    } else if (columnJavaType == Short.class) {
+                    } else if (columnJavaType.equals(Short.class)) {
                         row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
-                    } else if (columnJavaType == Boolean.class) {
+                    } else if (columnJavaType.equals(Boolean.class)) {
                         row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
-                    } else if (columnJavaType == byte[].class) {
+                    } else if (columnJavaType.equals(byte[].class)) {
                         row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
-                    } else if (columnJavaType == Long.class) {
+                    } else if (columnJavaType.equals(Long.class)) {
                         row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
-                    } else if (columnJavaType == Date.class) {
+                    } else if (columnJavaType.equals(Date.class)) {
                         row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
-                    } else if (columnJavaType == Time.class) {
+                    } else if (columnJavaType.equals(Time.class)) {
                         row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
-                    } else if (columnJavaType == Timestamp.class) {
+                    } else if (columnJavaType.equals(Timestamp.class)) {
                         row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
                     } else {
                         throw new RuntimeException("type =  " + columnType + " for column " + columnLabel + " not supported.");
@@ -133,17 +131,17 @@ public class JDBCClient {
         }
     }
 
-    public Map<String, Integer> getColumnSchema(String tableName) {
+    public List<Column> getColumnSchema(String tableName) {
         Connection connection = null;
-        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        List<Column> columns = new ArrayList<Column>();
         try {
             connection = this.dataSource.getConnection();
             DatabaseMetaData metaData = connection.getMetaData();
             ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
             while (resultSet.next()) {
-                columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"));
+                columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")));
             }
-            return columnSchemaMap;
+            return columns;
         } catch (SQLException e) {
             throw new RuntimeException("Failed to get schema for table " + tableName, e);
         } finally {
@@ -170,27 +168,27 @@ public class JDBCClient {
             Class columnJavaType = Util.getJavaType(column.getSqlType());
             if (column.getVal() == null) {
                 preparedStatement.setNull(index, column.getSqlType());
-            } else if (columnJavaType == String.class) {
+            } else if (columnJavaType.equals(String.class)) {
                 preparedStatement.setString(index, (String) column.getVal());
-            } else if (columnJavaType == Integer.class) {
+            } else if (columnJavaType.equals(Integer.class)) {
                 preparedStatement.setInt(index, (Integer) column.getVal());
-            } else if (columnJavaType == Double.class) {
+            } else if (columnJavaType.equals(Double.class)) {
                 preparedStatement.setDouble(index, (Double) column.getVal());
-            } else if (columnJavaType == Float.class) {
+            } else if (columnJavaType.equals(Float.class)) {
                 preparedStatement.setFloat(index, (Float) column.getVal());
-            } else if (columnJavaType == Short.class) {
+            } else if (columnJavaType.equals(Short.class)) {
                 preparedStatement.setShort(index, (Short) column.getVal());
-            } else if (columnJavaType == Boolean.class) {
+            } else if (columnJavaType.equals(Boolean.class)) {
                 preparedStatement.setBoolean(index, (Boolean) column.getVal());
-            } else if (columnJavaType == byte[].class) {
+            } else if (columnJavaType.equals(byte[].class)) {
                 preparedStatement.setBytes(index, (byte[]) column.getVal());
-            } else if (columnJavaType == Long.class) {
+            } else if (columnJavaType.equals(Long.class)) {
                 preparedStatement.setLong(index, (Long) column.getVal());
-            } else if (columnJavaType == Date.class) {
+            } else if (columnJavaType.equals(Date.class)) {
                 preparedStatement.setDate(index, (Date) column.getVal());
-            } else if (columnJavaType == Time.class) {
+            } else if (columnJavaType.equals(Time.class)) {
                 preparedStatement.setTime(index, (Time) column.getVal());
-            } else if (columnJavaType == Timestamp.class) {
+            } else if (columnJavaType.equals(Timestamp.class)) {
                 preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
             } else {
                 throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
new file mode 100644
index 0000000..77852f4
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.List;
+
+public interface JdbcLookupMapper extends JdbcMapper {
+
+    /**
+     * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+     * storm input tuple and a single DB row to result in multiple output values.
+     * @param input the input tuple.
+     * @param columns list of columns that represents a row
+     * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+     */
+    public List<Values> toTuple(ITuple input, List<Column> columns);
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
new file mode 100644
index 0000000..e2a7e8c
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
@@ -0,0 +1,46 @@
+package org.apache.storm.jdbc.mapper;
+
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLookupMapper {
+
+    private Fields outputFields;
+
+    public SimpleJdbcLookupMapper(Fields outputFields, List<Column> queryColumns) {
+        super(queryColumns);
+        this.outputFields = outputFields;
+    }
+
+    @Override
+    public List<Values> toTuple(ITuple input, List<Column> columns) {
+        Values values = new Values();
+
+        for(String field : outputFields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                for(Column column : columns) {
+                    if(column.getColumnName().equals(field)) {
+                        values.add(column.getVal());
+                    }
+                }
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(outputFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 7011a72..df25695 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -31,19 +31,23 @@ import java.util.Map;
 
 public class SimpleJdbcMapper implements JdbcMapper {
 
-    private Map<String, Integer> columnNameToType;
+    private List<Column> schemaColumns;
 
     public SimpleJdbcMapper(String tableName, Map map) {
         JDBCClient client = new JDBCClient(map);
-        this.columnNameToType = client.getColumnSchema(tableName);
+        this.schemaColumns = client.getColumnSchema(tableName);
+    }
+
+    public SimpleJdbcMapper(List<Column> schemaColumns) {
+        this.schemaColumns = schemaColumns;
     }
 
     @Override
     public List<Column> getColumns(ITuple tuple) {
         List<Column> columns = new ArrayList<Column>();
-        for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) {
-            String columnName = entry.getKey();
-            Integer columnSqlType = entry.getValue();
+        for(Column column : schemaColumns) {
+            String columnName = column.getColumnName();
+            Integer columnSqlType = column.getSqlType();
 
             if(Util.getJavaType(columnSqlType).equals(String.class)) {
                 String value = tuple.getStringByField(columnName);

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
new file mode 100644
index 0000000..ad39f4b
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class JdbcQuery extends BaseQueryFunction<JdbcState, List<Values>> {
+
+    @Override
+    public List<List<Values>> batchRetrieve(JdbcState jdbcState, List<TridentTuple> tridentTuples) {
+        return jdbcState.batchRetrieve(tridentTuples);
+    }
+
+    @Override
+    public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
+        for (Values value : values) {
+            tridentCollector.emit(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index fec2ee4..6b4e79a 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -18,10 +18,13 @@
 package org.apache.storm.jdbc.trident.state;
 
 import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.Validate;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.JDBCClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.trident.operation.TridentCollector;
@@ -48,8 +51,10 @@ public class JdbcState implements State {
 
     public static class Options implements Serializable {
         private JdbcMapper mapper;
+        private JdbcLookupMapper jdbcLookupMapper;
         private String configKey;
         private String tableName;
+        private String selectQuery;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -65,6 +70,16 @@ public class JdbcState implements State {
             this.mapper = mapper;
             return this;
         }
+
+        public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
+            this.jdbcLookupMapper = jdbcLookupMapper;
+            return this;
+        }
+
+        public Options withSelectQuery(String selectQuery) {
+            this.selectQuery = selectQuery;
+            return this;
+        }
     }
 
     protected void prepare() {
@@ -98,4 +113,22 @@ public class JdbcState implements State {
             throw new FailedException(e);
         }
     }
+
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+        try {
+            for (TridentTuple tuple : tridentTuples) {
+                List<Column> columns = options.jdbcLookupMapper.getColumns(tuple);
+                List<List<Column>> rows = jdbcClient.select(options.selectQuery, columns);
+                for(List<Column> row : rows) {
+                    List<Values> values = options.jdbcLookupMapper.toTuple(tuple, row);
+                    batchRetrieveResult.add(values);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
index 39fde59..718917a 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -72,7 +72,7 @@ public class UserSpout implements IRichSpout {
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("id","user_name","create_date"));
+        declarer.declare(new Fields("user_id","user_name","create_date"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
new file mode 100644
index 0000000..700f83e
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import backtype.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+    private static final List<String> setupSqls = Lists.newArrayList(
+            "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
+            "create table if not exists department (dept_id integer, dept_name varchar(100))",
+            "create table if not exists user_department (user_id integer, dept_id integer)",
+            "insert into department values (1, 'R&D')",
+            "insert into department values (2, 'Finance')",
+            "insert into department values (3, 'HR')",
+            "insert into department values (4, 'Sales')",
+            "insert into user_department values (1, 1)",
+            "insert into user_department values (2, 2)",
+            "insert into user_department values (3, 3)",
+            "insert into user_department values (4, 4)"
+    );
+    protected UserSpout userSpout;
+    protected JdbcMapper jdbcMapper;
+    protected JdbcLookupMapper jdbcLookupMapper;
+
+    protected static final String TABLE_NAME = "user";
+    protected static final String JDBC_CONF = "jdbc.conf";
+    protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
+            " and user_department.user_id = ?";
+
+    public void execute(String[] args) throws Exception {
+        if (args.length != 4 && args.length != 5) {
+            System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
+                    + "<user> <password> [topology name]");
+            System.exit(-1);
+        }
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user", args[2]);//root
+        map.put("dataSource.password", args[3]);//password
+
+        Config config = new Config();
+        config.put(JDBC_CONF, map);
+
+        JDBCClient jdbcClient = new JDBCClient(map);
+        for (String sql : setupSqls) {
+            jdbcClient.executeSql(sql);
+        }
+
+        this.userSpout = new UserSpout();
+        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+        Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
+        List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+        this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
+
+        if (args.length == 4) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, getTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            StormSubmitter.submitTopology(args[5], config, getTopology());
+        }
+    }
+
+    public abstract StormTopology getTopology();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 21e4639..26a00aa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -17,62 +17,36 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
-import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.bolt.JdbcBolt;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
-import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
 
-import java.util.Map;
 
-
-public class UserPersistanceTopology {
+public class UserPersistanceTopology extends AbstractUserTopology {
     private static final String USER_SPOUT = "USER_SPOUT";
-    private static final String USER_BOLT = "USER_BOLT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
 
     public static void main(String[] args) throws Exception {
-        if(args.length < 5) {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
-        Map map = Maps.newHashMap();
-        map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
-        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
-        map.put("dataSource.user",args[2]);//root
-        map.put("dataSource.password",args[3]);//password
-        String tableName = args[4];//database table name
-        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
-        Config config = new Config();
-
-        config.put("jdbc.conf", map);
+        new UserPersistanceTopology().execute(args);
+    }
 
-        UserSpout spout = new UserSpout();
-        JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper)
-                .withConfigKey("jdbc.conf");
+    @Override
+    public StormTopology getTopology() {
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+                .withJdbcLookupMapper(this.jdbcLookupMapper)
+                .withSelectSql(SELECT_QUERY);
+        JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+                .withTableName(TABLE_NAME)
+                .withJdbcMapper(this.jdbcMapper);
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.setSpout(USER_SPOUT, spout, 1);
-        builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT);
-
-        if (args.length == 5) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.createTopology());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 6) {
-            StormSubmitter.submitTopology(args[6], config, builder.createTopology());
-        } else {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
+        builder.setSpout(USER_SPOUT, this.userSpout, 1);
+        builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
+        builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+        return builder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 3b2ee66..2cf3403 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -17,60 +17,45 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
-import com.google.common.collect.Maps;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import com.google.common.collect.Lists;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
 import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcQuery;
 import org.apache.storm.jdbc.trident.state.JdbcState;
 import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
 import org.apache.storm.jdbc.trident.state.JdbcUpdater;
 import storm.trident.Stream;
+import storm.trident.TridentState;
 import storm.trident.TridentTopology;
 
-import java.util.Map;
+import java.sql.Types;
 
-public class UserPersistanceTridentTopology {
+public class UserPersistanceTridentTopology extends AbstractUserTopology {
 
     public static void main(String[] args) throws Exception {
-        Map map = Maps.newHashMap();
-        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
-        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
-        map.put("dataSource.user",args[2]);//root
-        map.put("dataSource.password",args[3]);//password
-        String tableName = args[4];//database table name
-        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
-        Config config = new Config();
-
-        config.put("jdbc.conf", map);
+        new UserPersistanceTridentTopology().execute(args);
+    }
 
+    @Override
+    public StormTopology getTopology() {
         TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("userSpout", new UserSpout());
 
         JdbcState.Options options = new JdbcState.Options()
-                .withConfigKey("jdbc.conf")
-                .withMapper(jdbcMapper)
-                .withTableName("user");
+                .withConfigKey(JDBC_CONF)
+                .withMapper(this.jdbcMapper)
+                .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+                .withTableName(TABLE_NAME)
+                .withSelectQuery(SELECT_QUERY);
 
         JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
-        stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"),  new JdbcUpdater(), new Fields());
-        if (args.length == 5) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, topology.build());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 6) {
-            StormSubmitter.submitTopology(args[6], config, topology.build());
-        } else {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
-    }
 
+        Stream stream = topology.newStream("userSpout", new UserSpout());
+        TridentState state = topology.newStaticState(jdbcStateFactory);
+        stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
+        stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"),  new JdbcUpdater(), new Fields());
+        return topology.build();
+    }
 }


[02/21] storm git commit: Merge remote-tracking branch 'upstream/master' into STORM-616

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into STORM-616


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ab9f778a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ab9f778a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ab9f778a

Branch: refs/heads/master
Commit: ab9f778ae50a1e224ebdcc58e6249009fc1f91cc
Parents: 5b16016 ab76e67
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Jan 5 22:23:52 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Jan 5 22:23:52 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   8 +
 DEVELOPER.md                                    |   5 +-
 README.markdown                                 |  18 +-
 STORM-UI-REST-API.md                            | 186 ++++++++++++-------
 bin/storm                                       |  59 +++---
 conf/defaults.yaml                              |   1 +
 docs/about/integrates.md                        |   2 +-
 docs/about/multi-language.md                    |   6 +-
 docs/about/simple-api.md                        |   2 +-
 .../Acking-framework-implementation.md          |   8 +-
 docs/documentation/Clojure-DSL.md               |   6 +-
 docs/documentation/Common-patterns.md           |   2 +-
 docs/documentation/Configuration.md             |   4 +-
 .../Creating-a-new-Storm-project.md             |   6 +-
 .../DSLs-and-multilang-adapters.md              |   3 +-
 ...Defining-a-non-jvm-language-dsl-for-storm.md |   2 +-
 docs/documentation/Distributed-RPC.md           |   2 +-
 docs/documentation/FAQ.md                       |   4 +-
 docs/documentation/Kestrel-and-Storm.md         |   2 +-
 docs/documentation/Lifecycle-of-a-topology.md   |  70 +++----
 docs/documentation/Maven.md                     |  50 +----
 docs/documentation/Multilang-protocol.md        |  30 ++-
 .../Serialization-(prior-to-0.6.0).md           |   2 +-
 .../documentation/Setting-up-a-Storm-cluster.md |   4 +-
 .../Setting-up-development-environment.md       |   2 +-
 docs/documentation/Structure-of-the-codebase.md |  88 ++++-----
 docs/documentation/Transactional-topologies.md  |  12 +-
 docs/documentation/Trident-API-Overview.md      |   4 +-
 docs/documentation/Trident-spouts.md            |   8 +-
 docs/documentation/Trident-state.md             |  12 +-
 docs/documentation/Tutorial.md                  |   4 +-
 docs/downloads.html                             |   2 +-
 .../storm/hdfs/bolt/format/SequenceFormat.java  |   5 +-
 external/storm-kafka/README.md                  |   2 +-
 .../FieldNameBasedTupleToKafkaMapper.java       |   2 +-
 logback/cluster.xml                             |   2 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   2 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   9 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  18 +-
 .../src/clj/backtype/storm/messaging/loader.clj |   3 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |   1 +
 storm-core/src/jvm/backtype/storm/Config.java   |   6 +
 storm-core/src/ui/public/component.html         |   2 +-
 43 files changed, 373 insertions(+), 293 deletions(-)
----------------------------------------------------------------------



[11/21] storm git commit: Storm-616: fixed casing mismatch. added collectore.reportError so the error is reported back to ui. added storm-jdbc to storm-dist.

Posted by sr...@apache.org.
Storm-616: fixed casing mismatch. added collectore.reportError so the error is reported back to ui. added storm-jdbc to storm-dist.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e0f623f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e0f623f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e0f623f

Branch: refs/heads/master
Commit: 1e0f623f850f219e132ad0807e2d2aca16b64aa4
Parents: f217096
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 14:24:45 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 14:24:45 2015 -0800

----------------------------------------------------------------------
 .../apache/storm/jdbc/bolt/AbstractJdbcBolt.java    |  7 +++----
 .../java/org/apache/storm/jdbc/bolt/JdbcBolt.java   |  1 +
 .../org/apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  3 ++-
 .../org/apache/storm/jdbc/common/JDBCClient.java    |  6 +++---
 .../apache/storm/jdbc/mapper/SimpleJdbcMapper.java  |  4 ++--
 .../apache/storm/jdbc/trident/state/JdbcState.java  |  6 +++---
 .../apache/storm/jdbc/common/JdbcClientTest.java    |  4 ++--
 .../storm/jdbc/topology/AbstractUserTopology.java   |  4 ++--
 storm-dist/binary/src/main/assembly/binary.xml      | 16 ++++++++++++++--
 9 files changed, 32 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 1e717eb..ae5a249 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -21,8 +21,7 @@ import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
 import org.apache.commons.lang.Validate;
-import org.apache.storm.jdbc.common.JDBCClient;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.common.JdbcClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +32,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
     protected OutputCollector collector;
 
-    protected transient JDBCClient jdbcClient;
+    protected transient JdbcClient jdbcClient;
     protected String configKey;
 
     public AbstractJdbcBolt(String configKey) {
@@ -48,6 +47,6 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
         Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
 
-        this.jdbcClient = new JDBCClient(conf);
+        this.jdbcClient = new JdbcClient(conf);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index 4c63a09..fd27285 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -61,6 +61,7 @@ public class JdbcBolt extends AbstractJdbcBolt {
         } catch (Exception e) {
             LOG.warn("Failing tuple.", e);
             this.collector.fail(tuple);
+            this.collector.reportError(e);
             return;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 2e4e4e6..7e548ff 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -68,8 +68,9 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
             }
             this.collector.ack(tuple);
         } catch (Exception e) {
-            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
+            LOG.warn("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
             this.collector.fail(tuple);
+            this.collector.reportError(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index 410c884..ab3f8a7 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -31,12 +31,12 @@ import java.sql.*;
 import java.sql.Date;
 import java.util.*;
 
-public class JDBCClient {
-    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
+public class JdbcClient {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
 
     private HikariDataSource dataSource;
 
-    public JDBCClient(Map<String, Object> map) {
+    public JdbcClient(Map<String, Object> map) {
         Properties properties = new Properties();
         properties.putAll(map);
         HikariConfig config = new HikariConfig(properties);

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index df25695..81fc207 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -19,7 +19,7 @@ package org.apache.storm.jdbc.mapper;
 
 import backtype.storm.tuple.ITuple;
 import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.common.Util;
 
 import java.sql.Date;
@@ -34,7 +34,7 @@ public class SimpleJdbcMapper implements JdbcMapper {
     private List<Column> schemaColumns;
 
     public SimpleJdbcMapper(String tableName, Map map) {
-        JDBCClient client = new JDBCClient(map);
+        JdbcClient client = new JdbcClient(map);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 6b4e79a..129191a 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -22,7 +22,7 @@ import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.Validate;
 import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.slf4j.Logger;
@@ -41,7 +41,7 @@ public class JdbcState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcState.class);
 
     private Options options;
-    private JDBCClient jdbcClient;
+    private JdbcClient jdbcClient;
     private Map map;
 
     protected JdbcState(Map map, int partitionIndex, int numPartitions, Options options) {
@@ -86,7 +86,7 @@ public class JdbcState implements State {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
-        this.jdbcClient = new JDBCClient(conf);
+        this.jdbcClient = new JdbcClient(conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 432d9f8..3623b77 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -32,7 +32,7 @@ import java.util.Map;
 
 public class JdbcClientTest {
 
-    private JDBCClient client;
+    private JdbcClient client;
 
     private static final String tableName = "user_details";
     @Before
@@ -43,7 +43,7 @@ public class JdbcClientTest {
         map.put("dataSource.user","SA");//root
         map.put("dataSource.password","");//password
 
-        this.client = new JDBCClient(map);
+        this.client = new JdbcClient(map);
         client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 6d2f8e9..dc04ac1 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Fields;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
@@ -74,7 +74,7 @@ public abstract class AbstractUserTopology {
         Config config = new Config();
         config.put(JDBC_CONF, map);
 
-        JDBCClient jdbcClient = new JDBCClient(map);
+        JdbcClient jdbcClient = new JdbcClient(map);
         for (String sql : setupSqls) {
             jdbcClient.executeSql(sql);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/1e0f623f/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 5b687b6..122633f 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -103,8 +103,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
-
-
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-jdbc/target</directory>
+            <outputDirectory>external/storm-jdbc</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-jdbc</directory>
+            <outputDirectory>external/storm-jdbc</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
     </fileSets>
 
     <files>


[21/21] storm git commit: Added STORM-616 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-616 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/64d7ac6b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/64d7ac6b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/64d7ac6b

Branch: refs/heads/master
Commit: 64d7ac6b228df1174455040a5a9f4d52cc53db7c
Parents: 10ea5e1
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 23 14:12:04 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 23 14:12:04 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/64d7ac6b/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 7d465dd..19655cb 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-616: Storm JDBC Connector.
  * STORM-329: fix cascading Storm failure by improving reconnection strategy and buffering messages
  * STORM-641: Add total number of topologies to api/v1/cluster/summary.
  * STORM-640: Storm UI vulnerable to poodle attack.


[10/21] storm git commit: storm-616: fixing number of args for test topologies.

Posted by sr...@apache.org.
storm-616: fixing number of args for test topologies.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f217096c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f217096c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f217096c

Branch: refs/heads/master
Commit: f217096cd36383639a7f7384e6ce34c78cf5d954
Parents: 396bc61
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 11:42:20 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 11:42:20 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                                      | 2 +-
 .../java/org/apache/storm/jdbc/topology/AbstractUserTopology.java  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f217096c/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 79a0d61..4bb5e61 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -173,7 +173,7 @@ Mysql Example:
 ```
 storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
 org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
-jdbc:mysql://localhost/test root password user UserPersistenceTopology
+jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```
 
 You can execute a select query against the user table which should show newly inserted rows:

http://git-wip-us.apache.org/repos/asf/storm/blob/f217096c/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 700f83e..6d2f8e9 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -93,7 +93,7 @@ public abstract class AbstractUserTopology {
             cluster.shutdown();
             System.exit(0);
         } else {
-            StormSubmitter.submitTopology(args[5], config, getTopology());
+            StormSubmitter.submitTopology(args[4], config, getTopology());
         }
     }
 


[12/21] storm git commit: storm-616: adding query timeout configuration.

Posted by sr...@apache.org.
storm-616: adding query timeout configuration.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d6c5ed3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d6c5ed3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d6c5ed3

Branch: refs/heads/master
Commit: 2d6c5ed338872cd464f3d42c27864f7530d2da9c
Parents: 1e0f623
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 14:59:14 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 14:59:14 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 25 +++++++++++++-------
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  8 ++-----
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    | 10 ++++++--
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  9 +++++--
 .../apache/storm/jdbc/common/JDBCClient.java    |  6 ++++-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  3 ++-
 .../storm/jdbc/trident/state/JdbcState.java     |  8 ++++++-
 .../storm/jdbc/common/JdbcClientTest.java       |  3 ++-
 .../jdbc/topology/AbstractUserTopology.java     |  3 ++-
 .../jdbc/topology/UserPersistanceTopology.java  |  6 +++--
 10 files changed, 56 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 4bb5e61..948ba23 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -54,16 +54,20 @@ List<Column> columnSchema = Lists.newArrayList(
 ```
 
 ### JdbcBolt
-To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
-the rows will be inserted.
+To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
+hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
+the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
+max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs. 
+You should set this value to be <= topology.message.timeout.secs.
 
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+JdbcBolt userPersistanceBolt = new JdbcBolt()
+                                    .withConfigKey("jdbc.conf")
                                     .withTableName("user_details")
-                                    .withJdbcMapper(simpleJdbcMapper);
+                                    .withJdbcMapper(simpleJdbcMapper)
+                                    .withQueryTimeoutSecs(30);
  ```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
@@ -74,7 +78,8 @@ hikari configuration map. See the example below:
 JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withMapper(jdbcMapper)
-        .withTableName("user_details");
+        .withTableName("user_details")
+        .withQueryTimeoutSecs(30);
 
 JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
 ```
@@ -125,13 +130,16 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
 ```
 
 ### JdbcLookupBolt
-To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the 
+hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
+The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
 JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
         .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
         .withSelectSql("select user_name from user_details where user_id = ?")
+        .withQueryTimeoutSecs(30);
 ```
 
 ### JdbcTridentState for lookup
@@ -142,6 +150,7 @@ JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
         .withSelectQuery("select user_name from user_details where user_id = ?");
+        .withQueryTimeoutSecs(30);
 ```
 
 ## Example:

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index ae5a249..4b93d4d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,11 +34,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
     protected transient JdbcClient jdbcClient;
     protected String configKey;
-
-    public AbstractJdbcBolt(String configKey) {
-        Validate.notEmpty(configKey, "configKey can not be null");
-        this.configKey = configKey;
-    }
+    protected int queryTimeoutSecs = 30;
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
@@ -47,6 +43,6 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
         Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
 
-        this.jdbcClient = new JdbcClient(conf);
+        this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index fd27285..f4921f5 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -37,8 +37,9 @@ public class JdbcBolt extends AbstractJdbcBolt {
     private String tableName;
     private JdbcMapper jdbcMapper;
 
-    public JdbcBolt(String configKey) {
-        super(configKey);
+    public JdbcBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
     }
 
     public JdbcBolt withTableName(String tableName) {
@@ -51,6 +52,11 @@ public class JdbcBolt extends AbstractJdbcBolt {
         return this;
     }
 
+    public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
+
     @Override
     public void execute(Tuple tuple) {
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 7e548ff..041fbe8 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,8 +37,9 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt(String configKey) {
-        super(configKey);
+    public JdbcLookupBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
     }
 
     public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
@@ -51,6 +52,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
         return this;
     }
 
+    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
 
     @Override
     public void execute(Tuple tuple) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index ab3f8a7..d11d1b3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -35,12 +35,14 @@ public class JdbcClient {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
 
     private HikariDataSource dataSource;
+    private int queryTimeoutSecs;
 
-    public JdbcClient(Map<String, Object> map) {
+    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
         Properties properties = new Properties();
         properties.putAll(map);
         HikariConfig config = new HikariConfig(properties);
         this.dataSource = new HikariDataSource(config);
+        this.queryTimeoutSecs = queryTimeoutSecs;
     }
 
     public int insert(String tableName, List<List<Column>> columnLists) {
@@ -67,6 +69,7 @@ public class JdbcClient {
             }
 
             PreparedStatement preparedStatement = connection.prepareStatement(query);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
             for(List<Column> columnList : columnLists) {
                 setPreparedStatementParams(preparedStatement, columnList);
             }
@@ -84,6 +87,7 @@ public class JdbcClient {
         try {
             connection = this.dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
             setPreparedStatementParams(preparedStatement, queryParams);
             ResultSet resultSet = preparedStatement.executeQuery();
             List<List<Column>> rows = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 81fc207..ad7f1c0 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -34,7 +34,8 @@ public class SimpleJdbcMapper implements JdbcMapper {
     private List<Column> schemaColumns;
 
     public SimpleJdbcMapper(String tableName, Map map) {
-        JdbcClient client = new JdbcClient(map);
+        int queryTimeoutSecs = 30;
+        JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 129191a..48fde4e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -55,6 +55,7 @@ public class JdbcState implements State {
         private String configKey;
         private String tableName;
         private String selectQuery;
+        private int queryTimeoutSecs = 30;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -80,13 +81,18 @@ public class JdbcState implements State {
             this.selectQuery = selectQuery;
             return this;
         }
+
+        public Options withQueryTimeoutSecs(int queryTimeoutSecs) {
+            this.queryTimeoutSecs = queryTimeoutSecs;
+            return this;
+        }
     }
 
     protected void prepare() {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
-        this.jdbcClient = new JdbcClient(conf);
+        this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 3623b77..6423e8f 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -43,7 +43,8 @@ public class JdbcClientTest {
         map.put("dataSource.user","SA");//root
         map.put("dataSource.password","");//password
 
-        this.client = new JdbcClient(map);
+        int queryTimeoutSecs = 60;
+        this.client = new JdbcClient(map, queryTimeoutSecs);
         client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index dc04ac1..9cb0bfa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -74,7 +74,8 @@ public abstract class AbstractUserTopology {
         Config config = new Config();
         config.put(JDBC_CONF, map);
 
-        JdbcClient jdbcClient = new JdbcClient(map);
+        int queryTimeoutSecs = 60;
+        JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs);
         for (String sql : setupSqls) {
             jdbcClient.executeSql(sql);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 26a00aa..fbb0b6c 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,10 +34,12 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt()
+                .withConfigKey(JDBC_CONF)
                 .withJdbcLookupMapper(this.jdbcLookupMapper)
                 .withSelectSql(SELECT_QUERY);
-        JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+        JdbcBolt userPersistanceBolt = new JdbcBolt()
+                .withConfigKey(JDBC_CONF)
                 .withTableName(TABLE_NAME)
                 .withJdbcMapper(this.jdbcMapper);
 


[06/21] storm git commit: Merge remote-tracking branch 'upstream/master' into STORM-616

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into STORM-616


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd96dd0f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd96dd0f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd96dd0f

Branch: refs/heads/master
Commit: cd96dd0f29cee69747124373c9c58ed135ed0b4a
Parents: ca235e6 e71e2a3
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jan 6 17:44:28 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Jan 6 17:44:28 2015 -0500

----------------------------------------------------------------------
 CHANGELOG.md                                    |   9 +
 SECURITY.md                                     |  25 +-
 external/storm-hbase/README.md                  |  49 +++-
 external/storm-hbase/pom.xml                    |  17 +-
 .../storm/hbase/bolt/AbstractHBaseBolt.java     |   9 +-
 .../apache/storm/hbase/security/AutoHBase.java  | 243 ++++++++++++++++
 .../storm/hbase/security/HBaseSecurityUtil.java |  32 ++-
 .../storm/hbase/trident/state/HBaseState.java   |   9 +-
 external/storm-hdfs/README.md                   |  45 +++
 .../storm/hdfs/common/security/AutoHDFS.java    | 281 +++++++++++++++++++
 .../hdfs/common/security/HdfsSecurityUtil.java  |  30 +-
 .../src/jvm/storm/kafka/KafkaUtils.java         |  11 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |   6 +-
 .../kafka/TopicOffsetOutOfRangeException.java   |  25 ++
 .../jvm/storm/kafka/UpdateOffsetException.java  |  25 --
 .../kafka/trident/TridentKafkaEmitter.java      |  49 ++--
 .../src/test/storm/kafka/KafkaUtilsTest.java    |   2 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |  27 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   7 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   4 +
 storm-core/src/jvm/backtype/storm/Config.java   |  28 +-
 .../storm/security/auth/hadoop/AutoHDFS.java    | 262 -----------------
 .../jvm/backtype/storm/utils/ShellProcess.java  |  11 +-
 .../src/native/worker-launcher/configure.ac     |   2 +-
 .../worker-launcher/impl/worker-launcher.c      |   7 +-
 .../test/clj/backtype/storm/logviewer_test.clj  |  47 +++-
 .../backtype/storm/security/auth/auth_test.clj  |   8 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   6 +-
 28 files changed, 863 insertions(+), 413 deletions(-)
----------------------------------------------------------------------



[20/21] storm git commit: Merge branch 'STORM-616' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-616

Posted by sr...@apache.org.
Merge branch 'STORM-616' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-616


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/10ea5e1f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/10ea5e1f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/10ea5e1f

Branch: refs/heads/master
Commit: 10ea5e1fc410d8d28510f324b73a8155740b04bb
Parents: d733484 4fc8356
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Feb 23 13:28:53 2015 -0800
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Feb 23 13:28:53 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/LICENSE                     | 202 ++++++++++++++++
 external/storm-jdbc/README.md                   | 240 +++++++++++++++++++
 external/storm-jdbc/pom.xml                     | 125 ++++++++++
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  57 +++++
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |  71 ++++++
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  76 ++++++
 .../org/apache/storm/jdbc/common/Column.java    | 111 +++++++++
 .../apache/storm/jdbc/common/JdbcClient.java    | 228 ++++++++++++++++++
 .../java/org/apache/storm/jdbc/common/Util.java |  74 ++++++
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |  26 ++
 .../apache/storm/jdbc/mapper/JdbcMapper.java    |  33 +++
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  46 ++++
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  92 +++++++
 .../storm/jdbc/trident/state/JdbcQuery.java     |  40 ++++
 .../storm/jdbc/trident/state/JdbcState.java     | 145 +++++++++++
 .../jdbc/trident/state/JdbcStateFactory.java    |  40 ++++
 .../storm/jdbc/trident/state/JdbcUpdater.java   |  32 +++
 .../storm/jdbc/common/JdbcClientTest.java       |  88 +++++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |  90 +++++++
 .../jdbc/topology/AbstractUserTopology.java     | 106 ++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  48 ++++
 .../UserPersistanceTridentTopology.java         |  61 +++++
 external/storm-jdbc/src/test/sql/test.sql       |   1 +
 pom.xml                                         |   1 +
 storm-dist/binary/src/main/assembly/binary.xml  |  16 +-
 25 files changed, 2047 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[19/21] storm git commit: Merge remote-tracking branch 'upstream/master' into STORM-616

Posted by sr...@apache.org.
Merge remote-tracking branch 'upstream/master' into STORM-616

Conflicts:
	pom.xml


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fc83562
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fc83562
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fc83562

Branch: refs/heads/master
Commit: 4fc835624aaa5a293fe689198a2b4e769739338f
Parents: 04fccb1 bde3054
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Feb 23 09:57:28 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Feb 23 09:57:28 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   15 +
 DEVELOPER.md                                    |   24 +-
 README.markdown                                 |   10 +-
 SECURITY.md                                     |   10 +-
 bin/storm-config.cmd                            |   14 +-
 bin/storm.cmd                                   |   33 +-
 dev-tools/github/__init__.py                    |   11 +
 dev-tools/jira-github-join.py                   |    4 +-
 dev-tools/storm-merge.py                        |   31 +
 docs/documentation/Common-patterns.md           |   14 +-
 docs/documentation/Concepts.md                  |   13 +-
 docs/documentation/Multilang-protocol.md        |    4 +-
 docs/documentation/Trident-API-Overview.md      |    2 +-
 .../jvm/storm/starter/BasicDRPCTopology.java    |    3 +-
 .../storm/starter/SkewedRollingTopWords.java    |  134 +
 .../storm/starter/bolt/RollingCountAggBolt.java |   78 +
 external/storm-hdfs/pom.xml                     |   18 +-
 .../storm/hdfs/bolt/HdfsFileTopology.java       |    6 +-
 .../storm/hdfs/bolt/SequenceFileTopology.java   |    4 +-
 .../storm/hdfs/trident/TridentFileTopology.java |    2 +-
 .../hdfs/trident/TridentSequenceTopology.java   |    6 +-
 .../src/jvm/storm/kafka/PartitionManager.java   |    5 +-
 external/storm-redis/LICENSE                    |  202 +
 external/storm-redis/README.md                  |  137 +
 external/storm-redis/pom.xml                    |   65 +
 .../storm/redis/bolt/AbstractRedisBolt.java     |   67 +
 .../trident/mapper/TridentTupleMapper.java      |   27 +
 .../trident/state/RedisClusterMapState.java     |  294 +
 .../redis/trident/state/RedisClusterState.java  |   80 +
 .../trident/state/RedisClusterStateQuerier.java |   78 +
 .../trident/state/RedisClusterStateUpdater.java |   76 +
 .../redis/trident/state/RedisMapState.java      |  323 +
 .../storm/redis/trident/state/RedisState.java   |   83 +
 .../redis/trident/state/RedisStateQuerier.java  |   70 +
 .../state/RedisStateSetCountQuerier.java        |   74 +
 .../trident/state/RedisStateSetUpdater.java     |   80 +
 .../redis/trident/state/RedisStateUpdater.java  |   75 +
 .../redis/util/config/JedisClusterConfig.java   |   82 +
 .../redis/util/config/JedisPoolConfig.java      |   97 +
 .../util/container/JedisClusterContainer.java   |   47 +
 .../JedisCommandsContainerBuilder.java          |   38 +
 .../JedisCommandsInstanceContainer.java         |   25 +
 .../redis/util/container/JedisContainer.java    |   65 +
 .../storm/redis/topology/LookupWordCount.java   |  127 +
 .../redis/topology/PersistentWordCount.java     |  117 +
 .../storm/redis/topology/WordCounter.java       |   58 +
 .../apache/storm/redis/topology/WordSpout.java  |   88 +
 .../storm/redis/trident/PrintFunction.java      |   40 +
 .../redis/trident/WordCountTridentRedis.java    |   97 +
 .../trident/WordCountTridentRedisCluster.java   |  103 +
 .../WordCountTridentRedisClusterMap.java        |  101 +
 .../redis/trident/WordCountTridentRedisMap.java |   95 +
 .../redis/trident/WordCountTupleMapper.java     |   16 +
 pom.xml                                         |    9 +-
 storm-core/pom.xml                              |    7 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |   64 -
 storm-core/src/clj/backtype/storm/clojure.clj   |    2 +-
 storm-core/src/clj/backtype/storm/config.clj    |    3 +-
 .../src/clj/backtype/storm/daemon/drpc.clj      |    6 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |   26 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   26 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  191 +-
 .../src/clj/backtype/storm/daemon/task.clj      |   16 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   18 +-
 storm-core/src/clj/backtype/storm/tuple.clj     |    4 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |    1 +
 storm-core/src/clj/backtype/storm/util.clj      |    3 +
 .../coordination/BatchSubtopologyBuilder.java   |   11 +
 .../storm/drpc/DRPCInvocationsClient.java       |    6 +
 .../src/jvm/backtype/storm/drpc/DRPCSpout.java  |   14 +-
 .../storm/drpc/LinearDRPCInputDeclarer.java     |    5 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |   13 +-
 .../jvm/backtype/storm/drpc/ReturnResults.java  |    8 +-
 .../storm/generated/AlreadyAliveException.java  |  149 +-
 .../storm/generated/AuthorizationException.java |  149 +-
 .../src/jvm/backtype/storm/generated/Bolt.java  |  194 +-
 .../jvm/backtype/storm/generated/BoltStats.java |  908 +-
 .../storm/generated/ClusterSummary.java         |  339 +-
 .../storm/generated/ComponentCommon.java        |  426 +-
 .../storm/generated/ComponentObject.java        |   86 +-
 .../backtype/storm/generated/Credentials.java   |  220 +-
 .../storm/generated/DRPCExecutionException.java |  149 +-
 .../backtype/storm/generated/DRPCRequest.java   |  185 +-
 .../storm/generated/DistributedRPC.java         |  529 +-
 .../generated/DistributedRPCInvocations.java    | 1199 ++-
 .../jvm/backtype/storm/generated/ErrorInfo.java |  300 +-
 .../backtype/storm/generated/ExecutorInfo.java  |  198 +-
 .../storm/generated/ExecutorSpecificStats.java  |   72 +-
 .../backtype/storm/generated/ExecutorStats.java |  486 +-
 .../storm/generated/ExecutorSummary.java        |  371 +-
 .../storm/generated/GetInfoOptions.java         |  166 +-
 .../storm/generated/GlobalStreamId.java         |  185 +-
 .../jvm/backtype/storm/generated/Grouping.java  |  163 +-
 .../generated/InvalidTopologyException.java     |  149 +-
 .../backtype/storm/generated/JavaObject.java    |  239 +-
 .../backtype/storm/generated/JavaObjectArg.java |  108 +-
 .../backtype/storm/generated/KillOptions.java   |  176 +-
 .../jvm/backtype/storm/generated/Nimbus.java    | 9177 +++++++++++++-----
 .../storm/generated/NotAliveException.java      |  149 +-
 .../backtype/storm/generated/NullStruct.java    |  112 +-
 .../storm/generated/NumErrorsChoice.java        |    3 +-
 .../storm/generated/RebalanceOptions.java       |  348 +-
 .../storm/generated/ShellComponent.java         |  202 +-
 .../jvm/backtype/storm/generated/SpoutSpec.java |  194 +-
 .../backtype/storm/generated/SpoutStats.java    |  614 +-
 .../storm/generated/StateSpoutSpec.java         |  194 +-
 .../backtype/storm/generated/StormTopology.java |  410 +-
 .../backtype/storm/generated/StreamInfo.java    |  249 +-
 .../backtype/storm/generated/SubmitOptions.java |  208 +-
 .../storm/generated/SupervisorSummary.java      |  309 +-
 .../backtype/storm/generated/TopologyInfo.java  |  609 +-
 .../storm/generated/TopologyInitialStatus.java  |    3 +-
 .../storm/generated/TopologySummary.java        |  486 +-
 .../storm/grouping/PartialKeyGrouping.java      |   31 +-
 .../security/auth/SimpleTransportPlugin.java    |    2 +-
 .../backtype/storm/topology/InputDeclarer.java  |    3 +
 .../storm/topology/TopologyBuilder.java         |   11 +
 .../TransactionalTopologyBuilder.java           |   13 +-
 .../trident/drpc/ReturnResultsReducer.java      |    4 +-
 .../trident/spout/RichSpoutBatchExecutor.java   |    1 +
 .../topology/TridentTopologyBuilder.java        |   13 +-
 storm-core/src/py/storm/DistributedRPC-remote   |   35 +-
 storm-core/src/py/storm/DistributedRPC.py       |   38 +-
 .../py/storm/DistributedRPCInvocations-remote   |   43 +-
 .../src/py/storm/DistributedRPCInvocations.py   |   95 +-
 storm-core/src/py/storm/Nimbus-remote           |  111 +-
 storm-core/src/py/storm/Nimbus.py               |  640 +-
 storm-core/src/py/storm/constants.py            |    6 +-
 storm-core/src/py/storm/ttypes.py               |  420 +-
 .../test/clj/backtype/storm/clojure_test.clj    |    9 +-
 .../test/clj/backtype/storm/drpc_test.clj       |    8 +-
 .../test/clj/backtype/storm/grouping_test.clj   |    9 +-
 .../clj/backtype/storm/integration_test.clj     |   10 +-
 .../storm/messaging/netty_integration_test.clj  |    5 +-
 .../storm/messaging/netty_unit_test.clj         |    5 +-
 .../test/clj/backtype/storm/messaging_test.clj  |    6 +-
 .../test/clj/backtype/storm/metrics_test.clj    |    8 +-
 .../test/clj/backtype/storm/multilang_test.clj  |    6 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |   21 +-
 .../scheduler/multitenant_scheduler_test.clj    |    4 +-
 .../test/clj/backtype/storm/scheduler_test.clj  |    4 +-
 .../storm/security/auth/AuthUtils_test.clj      |    6 +-
 .../backtype/storm/security/auth/auth_test.clj  |   13 +-
 .../storm/security/auth/drpc_auth_test.clj      |    7 +-
 .../storm/security/auth/nimbus_auth_test.clj    |   10 +-
 .../clj/backtype/storm/subtopology_test.clj     |    9 +-
 .../test/clj/backtype/storm/supervisor_test.clj |   16 +-
 .../test/clj/backtype/storm/tick_tuple_test.clj |    7 +-
 .../clj/backtype/storm/transactional_test.clj   |   14 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |   26 +-
 150 files changed, 18827 insertions(+), 7056 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4fc83562/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 1125757,baff501..8b4db26
--- a/pom.xml
+++ b/pom.xml
@@@ -162,7 -162,7 +162,8 @@@
          <module>external/storm-kafka</module>
          <module>external/storm-hdfs</module>
          <module>external/storm-hbase</module>
 +        <module>external/storm-jdbc</module>
+         <module>external/storm-redis</module>
      </modules>
  
      <scm>


[16/21] storm git commit: STORM-616: renaming JDBCBolt to JDBCInsertBolt. Added Javadoc to Column.

Posted by sr...@apache.org.
STORM-616: renaming JDBCBolt to JDBCInsertBolt. Added Javadoc to Column.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e157edb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e157edb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e157edb2

Branch: refs/heads/master
Commit: e157edb2697eeb32cf371d6556e450c343747407
Parents: bb927c8
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 3 15:21:35 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Feb 3 15:21:35 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 43 +++++++++--
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  4 +
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    | 81 --------------------
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 79 +++++++++++++++++++
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  | 10 +--
 .../org/apache/storm/jdbc/common/Column.java    | 34 ++++++--
 .../apache/storm/jdbc/common/JdbcClient.java    |  6 +-
 .../jdbc/topology/UserPersistanceTopology.java  |  8 +-
 8 files changed, 157 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 190fb98..cfe449d 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -53,8 +53,8 @@ List<Column> columnSchema = Lists.newArrayList(
     JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
 
-### JdbcBolt
-To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
+### JdbcInsertBolt
+To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
 the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
 max seconds an insert query can take. The default is set to value of topology.message.timeout.secs.You should set this value 
@@ -63,8 +63,7 @@ to be <= topology.message.timeout.secs.
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcBolt userPersistanceBolt = new JdbcBolt()
-                                    .withConfigKey("jdbc.conf")
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf")
                                     .withTableName("user_details")
                                     .withJdbcMapper(simpleJdbcMapper)
                                     .withQueryTimeoutSecs(30);
@@ -178,11 +177,39 @@ select dept_name from department, user_department where department.dept_id = use
 Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
 storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
 
+To make it work with Mysql, you can add the following to the pom.xml
+
+```
+<dependency>
+    <groupId>mysql</groupId>
+    <artifactId>mysql-connector-java</artifactId>
+    <version>5.1.31</version>
+</dependency>
+```
+
+You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute 
+mvn clean compile assembly:single.
+
+```
+<plugin>
+    <artifactId>maven-assembly-plugin</artifactId>
+    <configuration>
+        <archive>
+            <manifest>
+                <mainClass>fully.qualified.MainClass</mainClass>
+            </manifest>
+        </archive>
+        <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+        </descriptorRefs>
+    </configuration>
+</plugin>
+```
+
 Mysql Example:
 ```
 storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
-org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
-jdbc:mysql://localhost/test root password UserPersistenceTopology
+org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```
 
 You can execute a select query against the user table which should show newly inserted rows:
@@ -212,5 +239,5 @@ specific language governing permissions and limitations
 under the License.
 
 ## Committer Sponsors
-* Parth Brahmbhatt ([brahmbhatt.parth@gmail.com](mailto:brahmbhatt.parth@gmail.com))
-* Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) 
\ No newline at end of file
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 436ad00..0d30529 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -50,4 +50,8 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
         this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
+
+    public AbstractJdbcBolt(String configKey) {
+        this.configKey = configKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
deleted file mode 100644
index f4921f5..0000000
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.bolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Basic bolt for writing to any Database table.
- * <p/>
- * Note: Each JdbcBolt defined in a topology is tied to a specific table.
- */
-public class JdbcBolt extends AbstractJdbcBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
-
-    private String tableName;
-    private JdbcMapper jdbcMapper;
-
-    public JdbcBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
-        return this;
-    }
-
-    public JdbcBolt withTableName(String tableName) {
-        this.tableName = tableName;
-        return this;
-    }
-
-    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
-        this.jdbcMapper = jdbcMapper;
-        return this;
-    }
-
-    public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
-        this.queryTimeoutSecs = queryTimeoutSecs;
-        return this;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        try {
-            List<Column> columns = jdbcMapper.getColumns(tuple);
-            List<List<Column>> columnLists = new ArrayList<List<Column>>();
-            columnLists.add(columns);
-            this.jdbcClient.insert(this.tableName, columnLists);
-        } catch (Exception e) {
-            LOG.warn("Failing tuple.", e);
-            this.collector.fail(tuple);
-            this.collector.reportError(e);
-            return;
-        }
-
-        this.collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
new file mode 100644
index 0000000..9abd553
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * <p/>
+ * Note: Each JdbcInsertBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcInsertBolt extends AbstractJdbcBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcInsertBolt.class);
+
+    private String tableName;
+    private JdbcMapper jdbcMapper;
+
+    public JdbcInsertBolt(String configKey) {
+        super(configKey);
+    }
+
+    public JdbcInsertBolt withTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+        this.jdbcMapper = jdbcMapper;
+        return this;
+    }
+
+    public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<Column> columns = jdbcMapper.getColumns(tuple);
+            List<List<Column>> columnLists = new ArrayList<List<Column>>();
+            columnLists.add(columns);
+            this.jdbcClient.insert(this.tableName, columnLists);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+            return;
+        }
+
+        this.collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 041fbe8..8232c2f 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,9 +37,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
-        return this;
+    public JdbcLookupBolt(String configKey) {
+        super(configKey);
     }
 
     public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
@@ -67,15 +66,14 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
                 for (List<Column> row : result) {
                     List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
                     for (Values value : values) {
-                        collector.emit(value);
+                        collector.emit(tuple, value);
                     }
                 }
             }
             this.collector.ack(tuple);
         } catch (Exception e) {
-            LOG.warn("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
-            this.collector.fail(tuple);
             this.collector.reportError(e);
+            this.collector.fail(tuple);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index 4c5b37d..c462c6e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -19,16 +19,40 @@ package org.apache.storm.jdbc.common;
 
 
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
 
+/**
+ * A database table can be defined as a list of rows and each row can be defined as a list of columns where
+ * each column instance has a name, a value and a type. This class represents an instance of a column in a database
+ * row. For example if we have the following table named user:
+ * <pre>
+ *  ____________________________
+ * |    UserId  |   UserName    |
+ * |      1     |    Foo        |
+ * |      2     |    Bar        |
+ *  ----------------------------
+ * </pre>
+ *
+ * The following class can be used to represent the data in the table as
+ * <pre>
+ * List<List<Column>> rows = new ArrayList<List<Column>>();
+ * List<Column> row1 = Lists.newArrayList(new Column("UserId", 1, Types.INTEGER), new Column("UserName", "Foo", Types.VARCHAR))
+ * List<Column> row1 = Lists.newArrayList(new Column("UserId", 2, Types.INTEGER), new Column("UserName", "Bar", Types.VARCHAR))
+ *
+ * rows.add(row1)
+ * rows.add(row2)
+ *
+ * </pre>
+ * @param <T>
+ */
 public class Column<T> implements Serializable {
 
     private String columnName;
     private T val;
+
+    /**
+     * The sql type(e.g. varchar, date, int) Idealy we would have an enum but java's jdbc API uses integer.
+     * See {@link java.sql.Types}
+     */
     private int sqlType;
 
     public Column(String columnName, T val, int sqlType) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index d11d1b3..4992ed7 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -64,9 +64,9 @@ public class JdbcClient {
             sb.append(placeHolders).append(")");
 
             String query = sb.toString();
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Executing query " + query);
-            }
+
+            LOG.debug("Executing query {}", query);
+
 
             PreparedStatement preparedStatement = connection.prepareStatement(query);
             preparedStatement.setQueryTimeout(queryTimeoutSecs);

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index fbb0b6c..32c012e 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -19,7 +19,7 @@ package org.apache.storm.jdbc.topology;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
-import org.apache.storm.jdbc.bolt.JdbcBolt;
+import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
 import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
 
 
@@ -34,12 +34,10 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt()
-                .withConfigKey(JDBC_CONF)
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
                 .withJdbcLookupMapper(this.jdbcLookupMapper)
                 .withSelectSql(SELECT_QUERY);
-        JdbcBolt userPersistanceBolt = new JdbcBolt()
-                .withConfigKey(JDBC_CONF)
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF)
                 .withTableName(TABLE_NAME)
                 .withJdbcMapper(this.jdbcMapper);
 


[05/21] storm git commit: Revert "STORM-616 : removing unintended changes."

Posted by sr...@apache.org.
Revert "STORM-616 : removing unintended changes."

This reverts commit d260759ac203383e27668a7cb7090926029f7406.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ca235e6c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ca235e6c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ca235e6c

Branch: refs/heads/master
Commit: ca235e6cb18006bbbac56361639309e73c196718
Parents: 079deda
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Jan 6 17:43:58 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Jan 6 17:43:58 2015 -0500

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java  |  7 ++++---
 .../src/jvm/storm/kafka/UpdateOffsetException.java        |  5 ++++-
 .../src/jvm/storm/kafka/trident/TridentKafkaEmitter.java  | 10 +++++++++-
 3 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 918da74..3165189 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -180,10 +180,11 @@ public class KafkaUtils {
         if (fetchResponse.hasError()) {
             KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
             if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
+                String msg = "Got fetch request with offset out of range: [" + offset + "]; " +
                         "retrying with default start offset time from configuration. " +
-                        "configured start offset time: [" + config.startOffsetTime + "]");
-                throw new UpdateOffsetException();
+                        "configured start offset time: [" + config.startOffsetTime + "]";
+                LOG.warn(msg);
+                throw new UpdateOffsetException(msg);
             } else {
                 String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
                 LOG.error(message);

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
index 1be7312..5c366ec 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/UpdateOffsetException.java
@@ -17,6 +17,9 @@
  */
 package storm.kafka;
 
-public class UpdateOffsetException extends RuntimeException {
+public class UpdateOffsetException extends FailedFetchException {
 
+    public UpdateOffsetException(String message) {
+        super(message);
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ca235e6c/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 94bf134..34566c5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -33,6 +33,7 @@ import storm.kafka.DynamicPartitionConnections;
 import storm.kafka.FailedFetchException;
 import storm.kafka.KafkaUtils;
 import storm.kafka.Partition;
+import storm.kafka.UpdateOffsetException;
 import storm.trident.operation.TridentCollector;
 import storm.trident.spout.IOpaquePartitionedTridentSpout;
 import storm.trident.spout.IPartitionedTridentSpout;
@@ -129,7 +130,14 @@ public class TridentKafkaEmitter {
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
         long start = System.nanoTime();
-        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        } catch (UpdateOffsetException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config);
+            LOG.warn("OffsetOutOfRange, Updating offset from offset = " + offset + " to offset = " + newOffset);
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, newOffset);
+        }
         long end = System.nanoTime();
         long millis = (end - start) / 1000000;
         _kafkaMeanFetchLatencyMetric.update(millis);