You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/01 19:38:23 UTC
[48/50] storm git commit: Merge branch 'STORM-821' of
github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch
Merge branch 'STORM-821' of github.com:Parth-Brahmbhatt/incubator-storm into 0.10.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/05d1f8b2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/05d1f8b2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/05d1f8b2
Branch: refs/heads/0.10.x-branch
Commit: 05d1f8b2451a0f958ff079699862c219ddb26833
Parents: 13c33f3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 15:41:12 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 15:47:08 2015 -0400
----------------------------------------------------------------------
external/storm-jdbc/README.md | 72 +++++++++++++++-----
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 17 +++--
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 5 +-
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 5 +-
.../org/apache/storm/jdbc/common/Column.java | 7 +-
.../storm/jdbc/common/ConnectionProvider.java | 26 +++++++
.../jdbc/common/HikariCPConnectionProvider.java | 46 +++++++++++++
.../apache/storm/jdbc/common/JdbcClient.java | 19 ++----
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 6 +-
.../storm/jdbc/trident/state/JdbcState.java | 13 ++--
.../storm/jdbc/common/JdbcClientTest.java | 5 +-
.../jdbc/topology/AbstractUserTopology.java | 17 +++--
.../jdbc/topology/UserPersistanceTopology.java | 18 ++++-
.../UserPersistanceTridentTopology.java | 2 +-
14 files changed, 196 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index ef7845a..2d1301a 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -1,10 +1,37 @@
#Storm JDBC
Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples
-in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
+in a storm topology.
## Inserting into a database.
The bolt and trident state included in this package for inserting data into a database tables are tied to a single table.
+
+### ConnectionProvider
+An interface that should be implemented by different connection pooling mechanism `org.apache.storm.jdbc.common.ConnectionPrvoider`
+
+```java
+public interface ConnectionPrvoider extends Serializable {
+ /**
+ * method must be idempotent.
+ */
+ void prepare();
+
+ /**
+ *
+ * @return a DB connection over which the queries can be executed.
+ */
+ Connection getConnection();
+
+ /**
+ * called once when the system is shutting down, should be idempotent.
+ */
+ void cleanup();
+}
+```
+
+Out of the box we support `org.apache.storm.jdbc.common.HikariCPConnectionProvider` which is an implementation that uses HikariCP.
+
+###JdbcMapper
The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
```java
@@ -17,11 +44,12 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
**The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.**
For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item
of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
-the supplied queries to try and resolve place holder by column names.
+the supplied queries to try and resolve place holder by column names. Not making any assumptions about the query syntax allows this connector
+to be used by some non-standard sql frameworks like Pheonix which only supports upsert into.
### JdbcInsertBolt
-To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the
-hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply
+To use the `JdbcInsertBolt`, you construct an instance of it by specifying a `ConnectionProvider` implementation
+and a `JdbcMapper` implementation that converts storm tuple to DB row. In addition, you must either supply
a table name using `withTableName` method or an insert query using `withInsertQuery`.
If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query.
You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take.
@@ -29,13 +57,21 @@ The default is set to value of topology.message.timeout.secs and a value of -1 w
You should set the query timeout value to be <= topology.message.timeout.secs.
```java
-Config config = new Config();
-config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+Map hikariConfigMap = Maps.newHashMap();
+hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
+hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
+hikariConfigMap.put("dataSource.user","root");
+hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+
+String tableName = "user_details";
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
+
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("user")
.withQueryTimeoutSecs(30);
Or
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into user values (?,?)")
.withQueryTimeoutSecs(30);
```
@@ -45,7 +81,7 @@ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMa
tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in
the database table that you intend to write to.
-To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a connectionProvider instance.
The following code creates a `SimpleJdbcMapper` instance that:
@@ -60,8 +96,9 @@ hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDa
hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
+ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
String tableName = "user_details";
-JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, connectionProvider);
```
The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn`
method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
@@ -89,12 +126,12 @@ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
```
### JdbcTridentState
We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
-state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
-hikari configuration map. See the example below:
+state you need to initialize it with the table name or an insert query, the JdbcMapper instance and connection provider instance.
+See the example below:
```java
JdbcState.Options options = new JdbcState.Options()
- .withConfigKey("jdbc.conf")
+ .withConnectionProvider(connectionProvider)
.withMapper(jdbcMapper)
.withTableName("user_details")
.withQueryTimeoutSecs(30);
@@ -151,15 +188,14 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
```
### JdbcLookupBolt
-To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the
-hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+To use the `JdbcLookupBolt`, construct an instance of it using a `ConnectionProvider` instance, `JdbcLookupMapper` instance and the select query to execute.
You can optionally specify a query timeout seconds param that specifies max seconds the select query can take.
The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
```java
String selectSql = "select user_name from user_details where user_id = ?";
SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
-JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt(connectionProvider, selectSql, lookupMapper)
.withQueryTimeoutSecs(30);
```
@@ -168,7 +204,7 @@ We also support a trident query state that can be used with trident topologies.
```java
JdbcState.Options options = new JdbcState.Options()
- .withConfigKey("jdbc.conf")
+ .withConnectionProvider(connectionProvider)
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withSelectQuery("select user_name from user_details where user_id = ?");
.withQueryTimeoutSecs(30);
@@ -210,7 +246,7 @@ To make it work with Mysql, you can add the following to the pom.xml
```
You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute
-mvn clean compile assembly:single.
+`mvn clean compile assembly:single`
```
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 0d30529..15a2345 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -21,7 +21,7 @@ import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.commons.lang.Validate;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,22 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
protected transient JdbcClient jdbcClient;
protected String configKey;
protected Integer queryTimeoutSecs;
+ protected ConnectionProvider connectionProvider;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
- Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
- Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
+ connectionProvider.prepare();
if(queryTimeoutSecs == null) {
queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
}
- this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
+ this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
}
- public AbstractJdbcBolt(String configKey) {
- this.configKey = configKey;
+ public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
+ }
+
+ @Override
+ public void cleanup() {
+ connectionProvider.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 131da27..2f29000 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +43,8 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
private String insertQuery;
private JdbcMapper jdbcMapper;
- public JdbcInsertBolt(String configKey, JdbcMapper jdbcMapper) {
- super(configKey);
+ public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
+ super(connectionProvider);
this.jdbcMapper = jdbcMapper;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index e1b1553..25122e2 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -21,6 +21,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +38,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
private JdbcLookupMapper jdbcLookupMapper;
- public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
- super(configKey);
+ public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
+ super(connectionProvider);
this.selectQuery = selectQuery;
this.jdbcLookupMapper = jdbcLookupMapper;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index c531fff..73bc0fd 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -84,19 +84,18 @@ public class Column<T> implements Serializable {
if (this == o) return true;
if (!(o instanceof Column)) return false;
- Column column = (Column) o;
+ Column<?> column = (Column<?>) o;
if (sqlType != column.sqlType) return false;
if (!columnName.equals(column.columnName)) return false;
- if (!val.equals(column.val)) return false;
+ return val != null ? val.equals(column.val) : column.val == null;
- return true;
}
@Override
public int hashCode() {
int result = columnName.hashCode();
- result = 31 * result + val.hashCode();
+ result = 31 * result + (val != null ? val.hashCode() : 0);
result = 31 * result + sqlType;
return result;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
new file mode 100644
index 0000000..b838e48
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.common;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.util.Map;
+
+/**
+ * Provides a database connection.
+ */
+public interface ConnectionProvider extends Serializable {
+ /**
+ * method must be idempotent.
+ */
+ void prepare();
+
+ /**
+ *
+ * @return a DB connection over which the queries can be executed.
+ */
+ Connection getConnection();
+
+ /**
+ * called once when the system is shutting down, should be idempotent.
+ */
+ void cleanup();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
new file mode 100644
index 0000000..b523fcc
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
@@ -0,0 +1,46 @@
+package org.apache.storm.jdbc.common;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+public class HikariCPConnectionProvider implements ConnectionProvider {
+
+ private Map<String, Object> configMap;
+ private transient HikariDataSource dataSource;
+
+ public HikariCPConnectionProvider(Map<String, Object> hikariCPConfigMap) {
+ this.configMap = hikariCPConfigMap;
+ }
+
+ @Override
+ public synchronized void prepare() {
+ if(dataSource == null) {
+ Properties properties = new Properties();
+ properties.putAll(configMap);
+ HikariConfig config = new HikariConfig(properties);
+ this.dataSource = new HikariDataSource(config);
+ this.dataSource.setAutoCommit(false);
+ }
+ }
+
+ @Override
+ public Connection getConnection() {
+ try {
+ return this.dataSource.getConnection();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ if(dataSource != null) {
+ dataSource.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 63797f4..228babe 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -21,8 +21,6 @@ import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
-import com.zaxxer.hikari.HikariConfig;
-import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,14 +32,11 @@ import java.util.*;
public class JdbcClient {
private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
- private HikariDataSource dataSource;
+ private ConnectionProvider connectionProvider;
private int queryTimeoutSecs;
- public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
- Properties properties = new Properties();
- properties.putAll(hikariConfigMap);
- HikariConfig config = new HikariConfig(properties);
- this.dataSource = new HikariDataSource(config);
+ public JdbcClient(ConnectionProvider connectionProvider, int queryTimeoutSecs) {
+ this.connectionProvider = connectionProvider;
this.queryTimeoutSecs = queryTimeoutSecs;
}
@@ -53,7 +48,7 @@ public class JdbcClient {
public void executeInsertQuery(String query, List<List<Column>> columnLists) {
Connection connection = null;
try {
- connection = this.dataSource.getConnection();
+ connection = connectionProvider.getConnection();
boolean autoCommit = connection.getAutoCommit();
if(autoCommit) {
connection.setAutoCommit(false);
@@ -110,7 +105,7 @@ public class JdbcClient {
public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
Connection connection = null;
try {
- connection = this.dataSource.getConnection();
+ connection = connectionProvider.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
if(queryTimeoutSecs > 0) {
preparedStatement.setQueryTimeout(queryTimeoutSecs);
@@ -166,7 +161,7 @@ public class JdbcClient {
Connection connection = null;
List<Column> columns = new ArrayList<Column>();
try {
- connection = this.dataSource.getConnection();
+ connection = connectionProvider.getConnection();
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
while (resultSet.next()) {
@@ -183,7 +178,7 @@ public class JdbcClient {
public void executeSql(String sql) {
Connection connection = null;
try {
- connection = this.dataSource.getConnection();
+ connection = connectionProvider.getConnection();
Statement statement = connection.createStatement();
statement.execute(sql);
} catch (SQLException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 841d5d6..c4005e3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -19,6 +19,7 @@ package org.apache.storm.jdbc.mapper;
import backtype.storm.tuple.ITuple;
import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.common.Util;
@@ -33,9 +34,10 @@ public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;
- public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) {
+ public SimpleJdbcMapper(String tableName, ConnectionProvider connectionProvider) {
int queryTimeoutSecs = 30;
- JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs);
+ connectionProvider.prepare();
+ JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs);
this.schemaColumns = client.getColumnSchema(tableName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 0f301f4..8afc466 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -21,9 +21,9 @@ import backtype.storm.Config;
import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
@@ -54,14 +54,14 @@ public class JdbcState implements State {
public static class Options implements Serializable {
private JdbcMapper mapper;
private JdbcLookupMapper jdbcLookupMapper;
- private String configKey;
+ private ConnectionProvider connectionProvider;
private String tableName;
private String insertQuery;
private String selectQuery;
private Integer queryTimeoutSecs;
- public Options withConfigKey(String configKey) {
- this.configKey = configKey;
+ public Options withConnectionPrvoider(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
return this;
}
@@ -97,8 +97,7 @@ public class JdbcState implements State {
}
protected void prepare() {
- Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
- Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
+ options.connectionProvider.prepare();
if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) {
throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." +
@@ -109,7 +108,7 @@ public class JdbcState implements State {
options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
}
- this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
+ this.jdbcClient = new JdbcClient(options.connectionProvider, options.queryTimeoutSecs);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 5b3be88..551cd72 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -25,7 +25,6 @@ import org.junit.Before;
import org.junit.Test;
import java.sql.Timestamp;
-import java.util.Date;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@@ -43,9 +42,11 @@ public class JdbcClientTest {
map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
map.put("dataSource.user","SA");//root
map.put("dataSource.password","");//password
+ ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+ connectionProvider.prepare();
int queryTimeoutSecs = 60;
- this.client = new JdbcClient(map, queryTimeoutSecs);
+ this.client = new JdbcClient(connectionProvider, queryTimeoutSecs);
client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index e94aca2..9df5a86 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -24,6 +24,8 @@ import backtype.storm.tuple.Fields;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.ConnectionProvider;
+import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
@@ -56,6 +58,7 @@ public abstract class AbstractUserTopology {
protected UserSpout userSpout;
protected JdbcMapper jdbcMapper;
protected JdbcLookupMapper jdbcLookupMapper;
+ protected ConnectionProvider connectionProvider;
protected static final String TABLE_NAME = "user";
protected static final String JDBC_CONF = "jdbc.conf";
@@ -72,23 +75,29 @@ public abstract class AbstractUserTopology {
map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
map.put("dataSource.user", args[2]);//root
- map.put("dataSource.password", args[3]);//password
+
+ if(args.length == 4) {
+ map.put("dataSource.password", args[3]);//password
+ }
Config config = new Config();
config.put(JDBC_CONF, map);
+ ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+ connectionProvider.prepare();
int queryTimeoutSecs = 60;
- JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs);
+ JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
for (String sql : setupSqls) {
jdbcClient.executeSql(sql);
}
this.userSpout = new UserSpout();
- this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+ this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, connectionProvider);
+ connectionProvider.cleanup();
Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
-
+ this.connectionProvider = new HikariCPConnectionProvider(map);
if (args.length == 4) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, getTopology());
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 7c529c8..585994e 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -19,8 +19,15 @@ package org.apache.storm.jdbc.topology;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
+import com.google.common.collect.Lists;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+
+import java.sql.Types;
+import java.util.List;
public class UserPersistanceTopology extends AbstractUserTopology {
@@ -34,8 +41,15 @@ public class UserPersistanceTopology extends AbstractUserTopology {
@Override
public StormTopology getTopology() {
- JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
- JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, this.jdbcMapper).withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
+
+ //must specify column schema when providing custom query.
+ List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
+ new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
+ JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
+
+ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
+ .withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
// userSpout ==> jdbcBolt
TopologyBuilder builder = new TopologyBuilder();
http://git-wip-us.apache.org/repos/asf/storm/blob/05d1f8b2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 2cf3403..7cf0ce6 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -44,7 +44,7 @@ public class UserPersistanceTridentTopology extends AbstractUserTopology {
TridentTopology topology = new TridentTopology();
JdbcState.Options options = new JdbcState.Options()
- .withConfigKey(JDBC_CONF)
+ .withConnectionPrvoider(connectionProvider)
.withMapper(this.jdbcMapper)
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withTableName(TABLE_NAME)