You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/01 19:38:23 UTC

[48/50] storm git commit: Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch

Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05d1f8b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05d1f8b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05d1f8b2

Branch: refs/heads/0.10.x-branch
Commit: 05d1f8b2451a0f958ff079699862c219ddb26833
Parents: 13c33f3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 15:41:12 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 15:47:08 2015 -0400

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 72 +++++++++++++++-----
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       | 17 +++--
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  |  5 +-
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  5 +-
 .../org/apache/storm/jdbc/common/Column.java    |  7 +-
 .../storm/jdbc/common/ConnectionProvider.java   | 26 +++++++
 .../jdbc/common/HikariCPConnectionProvider.java | 46 +++++++++++++
 .../apache/storm/jdbc/common/JdbcClient.java    | 19 ++----
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  6 +-
 .../storm/jdbc/trident/state/JdbcState.java     | 13 ++--
 .../storm/jdbc/common/JdbcClientTest.java       |  5 +-
 .../jdbc/topology/AbstractUserTopology.java     | 17 +++--
 .../jdbc/topology/UserPersistanceTopology.java  | 18 ++++-
 .../UserPersistanceTridentTopology.java         |  2 +-
 14 files changed, 196 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index ef7845a..2d1301a 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -1,10 +1,37 @@
 #Storm JDBC
 Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
 to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
-in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
+in a storm topology.
 
 ## Inserting into a database.
 The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.
+
+### ConnectionProvider
+An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider`
+
+```java
+public interface ConnectionPrvoider extends Serializable {
+    /**
+     * method must be idempotent.
+     */
+    void prepare();
+
+    /**
+     *
+     * @return a DB connection over which the queries can be executed.
+     */
+    Connection getConnection();
+
+    /**
+     * called once when the system is shutting down, should be idempotent.
+     */
+    void cleanup();
+}
+```
+
+Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP.
+
+###JdbcMapper
 The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
 
 ```java
@@ -17,11 +44,12 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
 **The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.**
 For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item 
 of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
-the supplied queries to try and resolve place holder by column names. 
+the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector
+to be used by some non-standard sql frameworks like Pheonix which only supports upsert into.
 
 ### JdbcInsertBolt
-To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the 
-hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply 
+To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation
+and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply
 a table name  using `withTableName` method or an insert query using `withInsertQuery`. 
 If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query.
 You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take. 
@@ -29,13 +57,21 @@ The default is set to value of topology.message.timeout.secs and a value of -1 w
 You should set the query timeout value to be <= topology.message.timeout.secs.
 
  ```java
-Config config = new Config();
-config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
+
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                     .withTableName("user")
                                     .withQueryTimeoutSecs(30);
                                     Or
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
                                     .withInsertQuery("insert into user values (?,?)")
                                     .withQueryTimeoutSecs(30);                                    
  ```
@@ -45,7 +81,7 @@ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMa
 tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
 the database table that you intend to write to.
 
-To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map. 
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a connectionProvider instance.
 
 The following code creates a `SimpleJdbcMapper` instance that:
 
@@ -60,8 +96,9 @@ hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDa
 hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
 hikariConfigMap.put("dataSource.user","root");
 hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
 String tableName = "user_details";
-JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
 ```
 The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn`
 method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
@@ -89,12 +126,12 @@ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
-state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
-hikari configuration map. See the example below:
+state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance.
+See the example below:
 
 ```java
 JdbcState.Options options = new JdbcState.Options()
-        .withConfigKey("jdbc.conf")
+        .withConnectionProvider(connectionProvider)
         .withMapper(jdbcMapper)
         .withTableName("user_details")
         .withQueryTimeoutSecs(30);
@@ -151,15 +188,14 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
 ```
 
 ### JdbcLookupBolt
-To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the 
-hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+To use the `JdbcLookupBolt`, construct an instance of it using a `ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query to execute.
 You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
 The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
 String selectSql = "select user_name from user_details where user_id = ?";
 SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
-JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
         .withQueryTimeoutSecs(30);
 ```
 
@@ -168,7 +204,7 @@ We also support a trident query state that can be used with trident topologies.
 
 ```java
 JdbcState.Options options = new JdbcState.Options()
-        .withConfigKey("jdbc.conf")
+        .withConnectionProvider(connectionProvider)
         .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
         .withSelectQuery("select user_name from user_details where user_id = ?");
         .withQueryTimeoutSecs(30);
@@ -210,7 +246,7 @@ To make it work with Mysql, you can add the following to the pom.xml
 ```
 
 You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute 
-mvn clean compile assembly:single.
+`mvn clean compile assembly:single`
 
 ```
 <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 0d30529..15a2345 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -21,7 +21,7 @@ import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,22 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
     protected transient JdbcClient jdbcClient;
     protected String configKey;
     protected Integer queryTimeoutSecs;
+    protected ConnectionProvider connectionProvider;
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
         this.collector = collector;
 
-        Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
-        Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
+        connectionProvider.prepare();
 
         if(queryTimeoutSecs == null) {
             queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
         }
 
-        this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
+        this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
     }
 
-    public AbstractJdbcBolt(String configKey) {
-        this.configKey = configKey;
+    public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
+        this.connectionProvider = connectionProvider;
+    }
+
+    @Override
+    public void cleanup() {
+        connectionProvider.cleanup();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 131da27..2f29000 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,8 +43,8 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
     private String insertQuery;
     private JdbcMapper jdbcMapper;
 
-    public JdbcInsertBolt(String configKey, JdbcMapper jdbcMapper) {
-        super(configKey);
+    public JdbcInsertBolt(ConnectionProvider connectionProvider,  JdbcMapper jdbcMapper) {
+        super(connectionProvider);
         this.jdbcMapper = jdbcMapper;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index e1b1553..25122e2 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -21,6 +21,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,8 +38,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
-        super(configKey);
+    public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
+        super(connectionProvider);
         this.selectQuery = selectQuery;
         this.jdbcLookupMapper = jdbcLookupMapper;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index c531fff..73bc0fd 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -84,19 +84,18 @@ public class Column<T> implements Serializable {
         if (this == o) return true;
         if (!(o instanceof Column)) return false;
 
-        Column column = (Column) o;
+        Column<?> column = (Column<?>) o;
 
         if (sqlType != column.sqlType) return false;
         if (!columnName.equals(column.columnName)) return false;
-        if (!val.equals(column.val)) return false;
+        return val != null ? val.equals(column.val) : column.val == null;
 
-        return true;
     }
 
     @Override
     public int hashCode() {
         int result = columnName.hashCode();
-        result = 31 * result + val.hashCode();
+        result = 31 * result + (val != null ? val.hashCode() : 0);
         result = 31 * result + sqlType;
         return result;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
new file mode 100644
index 0000000..b838e48
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.common;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.util.Map;
+
+/**
+ * Provides a database connection.
+ */
+public interface ConnectionProvider extends Serializable {
+    /**
+     * method must be idempotent.
+     */
+    void prepare();
+
+    /**
+     *
+     * @return a DB connection over which the queries can be executed.
+     */
+    Connection getConnection();
+
+    /**
+     * called once when the system is shutting down, should be idempotent.
+     */
+    void cleanup();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
new file mode 100644
index 0000000..b523fcc
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
@@ -0,0 +1,46 @@
+package org.apache.storm.jdbc.common;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+public class HikariCPConnectionProvider implements ConnectionProvider {
+
+    private Map<String, Object> configMap;
+    private transient HikariDataSource dataSource;
+
+    public HikariCPConnectionProvider(Map<String, Object> hikariCPConfigMap) {
+        this.configMap = hikariCPConfigMap;
+    }
+
+    @Override
+    public synchronized void prepare() {
+        if(dataSource == null) {
+            Properties properties = new Properties();
+            properties.putAll(configMap);
+            HikariConfig config = new HikariConfig(properties);
+            this.dataSource = new HikariDataSource(config);
+            this.dataSource.setAutoCommit(false);
+        }
+    }
+
+    @Override
+    public Connection getConnection() {
+        try {
+            return this.dataSource.getConnection();
+        } catch (SQLException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void cleanup() {
+        if(dataSource != null) {
+            dataSource.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 63797f4..228babe 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -21,8 +21,6 @@ import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Lists;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,14 +32,11 @@ import java.util.*;
 public class JdbcClient {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
 
-    private HikariDataSource dataSource;
+    private ConnectionProvider connectionProvider;
     private int queryTimeoutSecs;
 
-    public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
-        Properties properties = new Properties();
-        properties.putAll(hikariConfigMap);
-        HikariConfig config = new HikariConfig(properties);
-        this.dataSource = new HikariDataSource(config);
+    public JdbcClient(ConnectionProvider connectionProvider, int queryTimeoutSecs) {
+        this.connectionProvider = connectionProvider;
         this.queryTimeoutSecs = queryTimeoutSecs;
     }
 
@@ -53,7 +48,7 @@ public class JdbcClient {
     public void executeInsertQuery(String query, List<List<Column>> columnLists) {
         Connection connection = null;
         try {
-            connection = this.dataSource.getConnection();
+            connection = connectionProvider.getConnection();
             boolean autoCommit = connection.getAutoCommit();
             if(autoCommit) {
                 connection.setAutoCommit(false);
@@ -110,7 +105,7 @@ public class JdbcClient {
     public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
         Connection connection = null;
         try {
-            connection = this.dataSource.getConnection();
+            connection = connectionProvider.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
             if(queryTimeoutSecs > 0) {
                 preparedStatement.setQueryTimeout(queryTimeoutSecs);
@@ -166,7 +161,7 @@ public class JdbcClient {
         Connection connection = null;
         List<Column> columns = new ArrayList<Column>();
         try {
-            connection = this.dataSource.getConnection();
+            connection = connectionProvider.getConnection();
             DatabaseMetaData metaData = connection.getMetaData();
             ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
             while (resultSet.next()) {
@@ -183,7 +178,7 @@ public class JdbcClient {
     public void executeSql(String sql) {
         Connection connection = null;
         try {
-            connection = this.dataSource.getConnection();
+            connection = connectionProvider.getConnection();
             Statement statement = connection.createStatement();
             statement.execute(sql);
         } catch (SQLException e) {

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 841d5d6..c4005e3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -19,6 +19,7 @@ package org.apache.storm.jdbc.mapper;
 
 import backtype.storm.tuple.ITuple;
 import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.common.Util;
 
@@ -33,9 +34,10 @@ public class SimpleJdbcMapper implements JdbcMapper {
 
     private List<Column> schemaColumns;
 
-    public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) {
+    public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) {
         int queryTimeoutSecs = 30;
-        JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs);
+        connectionProvider.prepare();
+        JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 0f301f4..8afc466 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -21,9 +21,9 @@ import backtype.storm.Config;
 import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
-import org.apache.commons.lang.Validate;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
@@ -54,14 +54,14 @@ public class JdbcState implements State {
     public static class Options implements Serializable {
         private JdbcMapper mapper;
         private JdbcLookupMapper jdbcLookupMapper;
-        private String configKey;
+        private ConnectionProvider connectionProvider;
         private String tableName;
         private String insertQuery;
         private String selectQuery;
         private Integer queryTimeoutSecs;
 
-        public Options withConfigKey(String configKey) {
-            this.configKey = configKey;
+        public Options withConnectionPrvoider(ConnectionProvider connectionProvider) {
+            this.connectionProvider = connectionProvider;
             return this;
         }
 
@@ -97,8 +97,7 @@ public class JdbcState implements State {
     }
 
     protected void prepare() {
-        Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
-        Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
+        options.connectionProvider.prepare();
 
         if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) {
             throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." +
@@ -109,7 +108,7 @@ public class JdbcState implements State {
             options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
         }
 
-        this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
+        this.jdbcClient = new JdbcClient(options.connectionProvider, options.queryTimeoutSecs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 5b3be88..551cd72 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -25,7 +25,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.sql.Timestamp;
-import java.util.Date;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
@@ -43,9 +42,11 @@ public class JdbcClientTest {
         map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
         map.put("dataSource.user","SA");//root
         map.put("dataSource.password","");//password
+        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+        connectionProvider.prepare();
 
         int queryTimeoutSecs = 60;
-        this.client = new JdbcClient(map, queryTimeoutSecs);
+        this.client = new JdbcClient(connectionProvider, queryTimeoutSecs);
         client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index e94aca2..9df5a86 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -24,6 +24,8 @@ import backtype.storm.tuple.Fields;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
+import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
 import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
@@ -56,6 +58,7 @@ public abstract class AbstractUserTopology {
     protected UserSpout userSpout;
     protected JdbcMapper jdbcMapper;
     protected JdbcLookupMapper jdbcLookupMapper;
+    protected ConnectionProvider connectionProvider;
 
     protected static final String TABLE_NAME = "user";
     protected static final String JDBC_CONF = "jdbc.conf";
@@ -72,23 +75,29 @@ public abstract class AbstractUserTopology {
         map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
         map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
         map.put("dataSource.user", args[2]);//root
-        map.put("dataSource.password", args[3]);//password
+
+        if(args.length == 4) {
+            map.put("dataSource.password", args[3]);//password
+        }
 
         Config config = new Config();
         config.put(JDBC_CONF, map);
 
+        ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+        connectionProvider.prepare();
         int queryTimeoutSecs = 60;
-        JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs);
+        JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
         for (String sql : setupSqls) {
             jdbcClient.executeSql(sql);
         }
 
         this.userSpout = new UserSpout();
-        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider);
+        connectionProvider.cleanup();
         Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
         List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
         this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
-
+        this.connectionProvider = new HikariCPConnectionProvider(map);
         if (args.length == 4) {
             LocalCluster cluster = new LocalCluster();
             cluster.submitTopology("test", config, getTopology());

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 7c529c8..585994e 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -19,8 +19,15 @@ package org.apache.storm.jdbc.topology;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
+import com.google.common.collect.Lists;
 import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
 import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+
+import java.sql.Types;
+import java.util.List;
 
 
 public class UserPersistanceTopology extends AbstractUserTopology {
@@ -34,8 +41,15 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
-        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, this.jdbcMapper).withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
+
+        //must specify column schema when providing custom query.
+        List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
+                new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
+        JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
+
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
+                .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();

http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 2cf3403..7cf0ce6 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -44,7 +44,7 @@ public class UserPersistanceTridentTopology extends AbstractUserTopology {
         TridentTopology topology = new TridentTopology();
 
         JdbcState.Options options = new JdbcState.Options()
-                .withConfigKey(JDBC_CONF)
+                .withConnectionPrvoider(connectionProvider)
                 .withMapper(this.jdbcMapper)
                 .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
                 .withTableName(TABLE_NAME)