You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:07 UTC
[12/21] storm git commit: storm-616: adding query timeout
configuration.
storm-616: adding query timeout configuration.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d6c5ed3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d6c5ed3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d6c5ed3
Branch: refs/heads/master
Commit: 2d6c5ed338872cd464f3d42c27864f7530d2da9c
Parents: 1e0f623
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 14:59:14 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 14:59:14 2015 -0800
----------------------------------------------------------------------
external/storm-jdbc/README.md | 25 +++++++++++++-------
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 8 ++-----
.../org/apache/storm/jdbc/bolt/JdbcBolt.java | 10 ++++++--
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 9 +++++--
.../apache/storm/jdbc/common/JDBCClient.java | 6 ++++-
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 3 ++-
.../storm/jdbc/trident/state/JdbcState.java | 8 ++++++-
.../storm/jdbc/common/JdbcClientTest.java | 3 ++-
.../jdbc/topology/AbstractUserTopology.java | 3 ++-
.../jdbc/topology/UserPersistanceTopology.java | 6 +++--
10 files changed, 56 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 4bb5e61..948ba23 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -54,16 +54,20 @@ List<Column> columnSchema = Lists.newArrayList(
```
### JdbcBolt
-To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which
-the rows will be inserted.
+To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the
+hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and
+the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies
+max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs.
+You should set this value to be <= topology.message.timeout.secs.
```java
Config config = new Config();
config.put("jdbc.conf", hikariConfigMap);
-JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+JdbcBolt userPersistanceBolt = new JdbcBolt()
+ .withConfigKey("jdbc.conf")
.withTableName("user_details")
- .withJdbcMapper(simpleJdbcMapper);
+ .withJdbcMapper(simpleJdbcMapper)
+ .withQueryTimeoutSecs(30);
```
### JdbcTridentState
We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
@@ -74,7 +78,8 @@ hikari configuration map. See the example below:
JdbcState.Options options = new JdbcState.Options()
.withConfigKey("jdbc.conf")
.withMapper(jdbcMapper)
- .withTableName("user_details");
+ .withTableName("user_details")
+ .withQueryTimeoutSecs(30);
JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
```
@@ -125,13 +130,16 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
```
### JdbcLookupBolt
-To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the
+hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+You can optionally specify a query timeout seconds param that specifies max seconds the select query can take.
+The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
```java
JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
.withSelectSql("select user_name from user_details where user_id = ?")
+ .withQueryTimeoutSecs(30);
```
### JdbcTridentState for lookup
@@ -142,6 +150,7 @@ JdbcState.Options options = new JdbcState.Options()
.withConfigKey("jdbc.conf")
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withSelectQuery("select user_name from user_details where user_id = ?");
+ .withQueryTimeoutSecs(30);
```
## Example:
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index ae5a249..4b93d4d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,11 +34,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
protected transient JdbcClient jdbcClient;
protected String configKey;
-
- public AbstractJdbcBolt(String configKey) {
- Validate.notEmpty(configKey, "configKey can not be null");
- this.configKey = configKey;
- }
+ protected int queryTimeoutSecs = 30;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
@@ -47,6 +43,6 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
- this.jdbcClient = new JdbcClient(conf);
+ this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index fd27285..f4921f5 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -37,8 +37,9 @@ public class JdbcBolt extends AbstractJdbcBolt {
private String tableName;
private JdbcMapper jdbcMapper;
- public JdbcBolt(String configKey) {
- super(configKey);
+ public JdbcBolt withConfigKey(String configKey) {
+ this.configKey = configKey;
+ return this;
}
public JdbcBolt withTableName(String tableName) {
@@ -51,6 +52,11 @@ public class JdbcBolt extends AbstractJdbcBolt {
return this;
}
+ public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+ this.queryTimeoutSecs = queryTimeoutSecs;
+ return this;
+ }
+
@Override
public void execute(Tuple tuple) {
try {
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 7e548ff..041fbe8 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,8 +37,9 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
private JdbcLookupMapper jdbcLookupMapper;
- public JdbcLookupBolt(String configKey) {
- super(configKey);
+ public JdbcLookupBolt withConfigKey(String configKey) {
+ this.configKey = configKey;
+ return this;
}
public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
@@ -51,6 +52,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
return this;
}
+ public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+ this.queryTimeoutSecs = queryTimeoutSecs;
+ return this;
+ }
@Override
public void execute(Tuple tuple) {
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index ab3f8a7..d11d1b3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -35,12 +35,14 @@ public class JdbcClient {
private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
private HikariDataSource dataSource;
+ private int queryTimeoutSecs;
- public JdbcClient(Map<String, Object> map) {
+ public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
Properties properties = new Properties();
properties.putAll(map);
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
+ this.queryTimeoutSecs = queryTimeoutSecs;
}
public int insert(String tableName, List<List<Column>> columnLists) {
@@ -67,6 +69,7 @@ public class JdbcClient {
}
PreparedStatement preparedStatement = connection.prepareStatement(query);
+ preparedStatement.setQueryTimeout(queryTimeoutSecs);
for(List<Column> columnList : columnLists) {
setPreparedStatementParams(preparedStatement, columnList);
}
@@ -84,6 +87,7 @@ public class JdbcClient {
try {
connection = this.dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
+ preparedStatement.setQueryTimeout(queryTimeoutSecs);
setPreparedStatementParams(preparedStatement, queryParams);
ResultSet resultSet = preparedStatement.executeQuery();
List<List<Column>> rows = Lists.newArrayList();
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 81fc207..ad7f1c0 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -34,7 +34,8 @@ public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;
public SimpleJdbcMapper(String tableName, Map map) {
- JdbcClient client = new JdbcClient(map);
+ int queryTimeoutSecs = 30;
+ JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
this.schemaColumns = client.getColumnSchema(tableName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 129191a..48fde4e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -55,6 +55,7 @@ public class JdbcState implements State {
private String configKey;
private String tableName;
private String selectQuery;
+ private int queryTimeoutSecs = 30;
public Options withConfigKey(String configKey) {
this.configKey = configKey;
@@ -80,13 +81,18 @@ public class JdbcState implements State {
this.selectQuery = selectQuery;
return this;
}
+
+ public Options withQueryTimeoutSecs(int queryTimeoutSecs) {
+ this.queryTimeoutSecs = queryTimeoutSecs;
+ return this;
+ }
}
protected void prepare() {
Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
- this.jdbcClient = new JdbcClient(conf);
+ this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 3623b77..6423e8f 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -43,7 +43,8 @@ public class JdbcClientTest {
map.put("dataSource.user","SA");//root
map.put("dataSource.password","");//password
- this.client = new JdbcClient(map);
+ int queryTimeoutSecs = 60;
+ this.client = new JdbcClient(map, queryTimeoutSecs);
client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index dc04ac1..9cb0bfa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -74,7 +74,8 @@ public abstract class AbstractUserTopology {
Config config = new Config();
config.put(JDBC_CONF, map);
- JdbcClient jdbcClient = new JdbcClient(map);
+ int queryTimeoutSecs = 60;
+ JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs);
for (String sql : setupSqls) {
jdbcClient.executeSql(sql);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 26a00aa..fbb0b6c 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,10 +34,12 @@ public class UserPersistanceTopology extends AbstractUserTopology {
@Override
public StormTopology getTopology() {
- JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt()
+ .withConfigKey(JDBC_CONF)
.withJdbcLookupMapper(this.jdbcLookupMapper)
.withSelectSql(SELECT_QUERY);
- JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+ JdbcBolt userPersistanceBolt = new JdbcBolt()
+ .withConfigKey(JDBC_CONF)
.withTableName(TABLE_NAME)
.withJdbcMapper(this.jdbcMapper);