You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:07 UTC

[12/21] storm git commit: storm-616: adding query timeout configuration.

storm-616: adding query timeout configuration.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2d6c5ed3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2d6c5ed3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2d6c5ed3

Branch: refs/heads/master
Commit: 2d6c5ed338872cd464f3d42c27864f7530d2da9c
Parents: 1e0f623
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 14:59:14 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 14:59:14 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 25 +++++++++++++-------
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  8 ++-----
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    | 10 ++++++--
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  |  9 +++++--
 .../apache/storm/jdbc/common/JDBCClient.java    |  6 ++++-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  3 ++-
 .../storm/jdbc/trident/state/JdbcState.java     |  8 ++++++-
 .../storm/jdbc/common/JdbcClientTest.java       |  3 ++-
 .../jdbc/topology/AbstractUserTopology.java     |  3 ++-
 .../jdbc/topology/UserPersistanceTopology.java  |  6 +++--
 10 files changed, 56 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 4bb5e61..948ba23 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -54,16 +54,20 @@ List<Column> columnSchema = Lists.newArrayList(
 ```
 
 ### JdbcBolt
-To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
-the rows will be inserted.
+To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
+hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
+the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
+max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs. 
+You should set this value to be <= topology.message.timeout.secs.
 
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+JdbcBolt userPersistanceBolt = new JdbcBolt()
+                                    .withConfigKey("jdbc.conf")
                                     .withTableName("user_details")
-                                    .withJdbcMapper(simpleJdbcMapper);
+                                    .withJdbcMapper(simpleJdbcMapper)
+                                    .withQueryTimeoutSecs(30);
  ```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
@@ -74,7 +78,8 @@ hikari configuration map. See the example below:
 JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withMapper(jdbcMapper)
-        .withTableName("user_details");
+        .withTableName("user_details")
+        .withQueryTimeoutSecs(30);
 
 JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
 ```
@@ -125,13 +130,16 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
 ```
 
 ### JdbcLookupBolt
-To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
-In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the 
+hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
+The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
 JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
         .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
         .withSelectSql("select user_name from user_details where user_id = ?")
+        .withQueryTimeoutSecs(30);
 ```
 
 ### JdbcTridentState for lookup
@@ -142,6 +150,7 @@ JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
         .withSelectQuery("select user_name from user_details where user_id = ?");
+        .withQueryTimeoutSecs(30);
 ```
 
 ## Example:

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index ae5a249..4b93d4d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,11 +34,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
     protected transient JdbcClient jdbcClient;
     protected String configKey;
-
-    public AbstractJdbcBolt(String configKey) {
-        Validate.notEmpty(configKey, "configKey can not be null");
-        this.configKey = configKey;
-    }
+    protected int queryTimeoutSecs = 30;
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
@@ -47,6 +43,6 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
         Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
 
-        this.jdbcClient = new JdbcClient(conf);
+        this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index fd27285..f4921f5 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -37,8 +37,9 @@ public class JdbcBolt extends AbstractJdbcBolt {
     private String tableName;
     private JdbcMapper jdbcMapper;
 
-    public JdbcBolt(String configKey) {
-        super(configKey);
+    public JdbcBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
     }
 
     public JdbcBolt withTableName(String tableName) {
@@ -51,6 +52,11 @@ public class JdbcBolt extends AbstractJdbcBolt {
         return this;
     }
 
+    public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
+
     @Override
     public void execute(Tuple tuple) {
         try {

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 7e548ff..041fbe8 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,8 +37,9 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt(String configKey) {
-        super(configKey);
+    public JdbcLookupBolt withConfigKey(String configKey) {
+        this.configKey = configKey;
+        return this;
     }
 
     public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
@@ -51,6 +52,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
         return this;
     }
 
+    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
 
     @Override
     public void execute(Tuple tuple) {

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index ab3f8a7..d11d1b3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -35,12 +35,14 @@ public class JdbcClient {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
 
     private HikariDataSource dataSource;
+    private int queryTimeoutSecs;
 
-    public JdbcClient(Map<String, Object> map) {
+    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
         Properties properties = new Properties();
         properties.putAll(map);
         HikariConfig config = new HikariConfig(properties);
         this.dataSource = new HikariDataSource(config);
+        this.queryTimeoutSecs = queryTimeoutSecs;
     }
 
     public int insert(String tableName, List<List<Column>> columnLists) {
@@ -67,6 +69,7 @@ public class JdbcClient {
             }
 
             PreparedStatement preparedStatement = connection.prepareStatement(query);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
             for(List<Column> columnList : columnLists) {
                 setPreparedStatementParams(preparedStatement, columnList);
             }
@@ -84,6 +87,7 @@ public class JdbcClient {
         try {
             connection = this.dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
+            preparedStatement.setQueryTimeout(queryTimeoutSecs);
             setPreparedStatementParams(preparedStatement, queryParams);
             ResultSet resultSet = preparedStatement.executeQuery();
             List<List<Column>> rows = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 81fc207..ad7f1c0 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -34,7 +34,8 @@ public class SimpleJdbcMapper implements JdbcMapper {
     private List<Column> schemaColumns;
 
     public SimpleJdbcMapper(String tableName, Map map) {
-        JdbcClient client = new JdbcClient(map);
+        int queryTimeoutSecs = 30;
+        JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 129191a..48fde4e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -55,6 +55,7 @@ public class JdbcState implements State {
         private String configKey;
         private String tableName;
         private String selectQuery;
+        private int queryTimeoutSecs = 30;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -80,13 +81,18 @@ public class JdbcState implements State {
             this.selectQuery = selectQuery;
             return this;
         }
+
+        public Options withQueryTimeoutSecs(int queryTimeoutSecs) {
+            this.queryTimeoutSecs = queryTimeoutSecs;
+            return this;
+        }
     }
 
     protected void prepare() {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
-        this.jdbcClient = new JdbcClient(conf);
+        this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 3623b77..6423e8f 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -43,7 +43,8 @@ public class JdbcClientTest {
         map.put("dataSource.user","SA");//root
         map.put("dataSource.password","");//password
 
-        this.client = new JdbcClient(map);
+        int queryTimeoutSecs = 60;
+        this.client = new JdbcClient(map, queryTimeoutSecs);
         client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index dc04ac1..9cb0bfa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -74,7 +74,8 @@ public abstract class AbstractUserTopology {
         Config config = new Config();
         config.put(JDBC_CONF, map);
 
-        JdbcClient jdbcClient = new JdbcClient(map);
+        int queryTimeoutSecs = 60;
+        JdbcClient jdbcClient = new JdbcClient(map, queryTimeoutSecs);
         for (String sql : setupSqls) {
             jdbcClient.executeSql(sql);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/2d6c5ed3/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 26a00aa..fbb0b6c 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,10 +34,12 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt()
+                .withConfigKey(JDBC_CONF)
                 .withJdbcLookupMapper(this.jdbcLookupMapper)
                 .withSelectSql(SELECT_QUERY);
-        JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+        JdbcBolt userPersistanceBolt = new JdbcBolt()
+                .withConfigKey(JDBC_CONF)
                 .withTableName(TABLE_NAME)
                 .withJdbcMapper(this.jdbcMapper);