You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:08 UTC

[13/21] storm git commit: Storm-616: defaulting query time out to topology message timeout seconds.

Storm-616: defaulting query time out to topology message timeout seconds.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d00d18e2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d00d18e2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d00d18e2

Branch: refs/heads/master
Commit: d00d18e24ee8287803a9508e46f4028222edff84
Parents: 2d6c5ed
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 14 15:16:11 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 14 15:16:11 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                                 | 6 +++---
 .../java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java     | 7 ++++++-
 .../java/org/apache/storm/jdbc/trident/state/JdbcState.java   | 7 ++++++-
 3 files changed, 15 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 948ba23..1139450 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -57,8 +57,8 @@ List<Column> columnSchema = Lists.newArrayList(
 To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
 the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
-max seconds an insert query can take. The default is set to 30 seconds which is equal to topology.message.timeout.secs. 
-You should set this value to be <= topology.message.timeout.secs.
+max seconds an insert query can take. The default is set to value of topology.message.timeout.secs.You should set this value 
+to be <= topology.message.timeout.secs.
 
  ```java
 Config config = new Config();
@@ -133,7 +133,7 @@ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColum
 To use the `JdbcLookupBolt`, construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the `JdbcLookupMapper` and the select query to execute.
 You can optionally specify a query timeout seconds param that specifies max seconds the select query can take. 
-The default is set to 30 seconds which is equal to topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
+The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
 JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")

http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 4b93d4d..436ad00 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.jdbc.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.base.BaseRichBolt;
@@ -34,7 +35,7 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
     protected transient JdbcClient jdbcClient;
     protected String configKey;
-    protected int queryTimeoutSecs = 30;
+    protected Integer queryTimeoutSecs;
 
     @Override
     public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
@@ -43,6 +44,10 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
         Map<String, Object> conf = (Map<String, Object>)map.get(this.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + this.configKey + "'");
 
+        if(queryTimeoutSecs == null) {
+            queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+        }
+
         this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/d00d18e2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 48fde4e..01da5cd 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.jdbc.trident.state;
 
+import backtype.storm.Config;
 import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
@@ -55,7 +56,7 @@ public class JdbcState implements State {
         private String configKey;
         private String tableName;
         private String selectQuery;
-        private int queryTimeoutSecs = 30;
+        private Integer queryTimeoutSecs;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -92,6 +93,10 @@ public class JdbcState implements State {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
+        if(options.queryTimeoutSecs == null) {
+            options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
+        }
+
         this.jdbcClient = new JdbcClient(conf, options.queryTimeoutSecs);
     }