You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/03/16 17:39:36 UTC

[1/4] storm git commit: STORM-699: storm-jdbc should support custom insert queries.

Repository: storm
Updated Branches:
  refs/heads/master 537b77081 -> 81ce9a1d6


STORM-699: storm-jdbc should support custom insert queries.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/148c9205
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/148c9205
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/148c9205

Branch: refs/heads/master
Commit: 148c920500f5c1d748cacfb3739c4a035eddb0e3
Parents: 5eff2e7
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Fri Mar 6 18:46:10 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Mar 11 10:11:43 2015 -0700

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 84 +++++++++++++-------
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 36 +++++++--
 .../org/apache/storm/jdbc/common/Column.java    |  3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    | 52 +++++++-----
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  2 +-
 .../storm/jdbc/trident/state/JdbcState.java     | 18 ++++-
 .../storm/jdbc/common/JdbcClientTest.java       | 39 +++++----
 .../jdbc/topology/UserPersistanceTopology.java  |  2 +-
 8 files changed, 159 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 6fb1d41..ef7845a 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -13,14 +13,39 @@ public interface JdbcMapper  extends Serializable {
 }
 ```
 
-The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
+The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database. 
+**The order of the returned list is important. The place holders in the supplied queries are resolved in the same order as returned list.**
+For example if the user supplied insert query is `insert into user(user_id, user_name, create_date) values (?,?, now())` the 1st item 
+of the returned list of `getColumns` method will map to the 1st place holder and the 2nd to the 2nd and so on. We do not parse
+the supplied queries to try and resolve place holder by column names. 
+
+### JdbcInsertBolt
+To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that holds the 
+hikari configuration map and a `JdbcMapper` implementation that coverts storm tuple to DB row. In addition, you must either supply 
+a table name  using `withTableName` method or an insert query using `withInsertQuery`. 
+If you specify a insert query you should ensure that your `JdbcMapper` implementation will return a list of columns in the same order as in your insert query.
+You can optionally specify a query timeout seconds param that specifies max seconds an insert query can take. 
+The default is set to value of topology.message.timeout.secs and a value of -1 will indicate not to set any query timeout.
+You should set the query timeout value to be <= topology.message.timeout.secs.
+
+ ```java
+Config config = new Config();
+config.put("jdbc.conf", hikariConfigMap);
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+                                    .withTableName("user")
+                                    .withQueryTimeoutSecs(30);
+                                    Or
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf",simpleJdbcMapper)
+                                    .withInsertQuery("insert into user values (?,?)")
+                                    .withQueryTimeoutSecs(30);                                    
+ ```
 
 ### SimpleJdbcMapper
 `storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
 tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
 the database table that you intend to write to.
 
-To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
+To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map. 
 
 The following code creates a `SimpleJdbcMapper` instance that:
 
@@ -38,34 +63,30 @@ hikariConfigMap.put("dataSource.password","password");
 String tableName = "user_details";
 JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
 ```
-The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
-If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
-and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
-`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
-`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
-In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
-you can initialize the `jdbcMapper` as below:
+The mapper initialized in the example above assumes a storm tuple has value for all the columns of the table you intend to insert data into and its `getColumn`
+method will return the columns in the order in which Jdbc connection instance's `connection.getMetaData().getColumns();` method returns them.
 
+**If you specified your own insert query to `JdbcInsertBolt` you must initialize `SimpleJdbcMapper` with explicit columnschema such that the schema has columns in the same order as your insert queries.**
+For example if your insert query is `Insert into user (user_id, user_name) values (?,?)` then your `SimpleJdbcMapper` should be initialized with the following statements:
 ```java
 List<Column> columnSchema = Lists.newArrayList(
     new Column("user_id", java.sql.Types.INTEGER),
     new Column("user_name", java.sql.Types.VARCHAR));
-    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
 
-### JdbcInsertBolt
-To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
-hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
-the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
-max seconds an insert query can take. The default is set to value of topology.message.timeout.secs.You should set this value 
-to be <= topology.message.timeout.secs.
+If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table `create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
 
- ```java
-Config config = new Config();
-config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf","user_details",simpleJdbcMapper)
-                                    .withQueryTimeoutSecs(30);
- ```
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR),
+    new Column("dept_name", java.sql.Types.VARCHAR));
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
+```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
 state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
@@ -77,9 +98,9 @@ JdbcState.Options options = new JdbcState.Options()
         .withMapper(jdbcMapper)
         .withTableName("user_details")
         .withQueryTimeoutSecs(30);
-
 JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
 ```
+similar to `JdbcInsertBolt` you can specify a custom insert query using `withInsertQuery` instead of specifying a table name.
 
 ## Lookup from Database
 We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
@@ -88,21 +109,24 @@ executing select queries against a database using JDBC is the `org.apache.storm.
 ```java
     void declareOutputFields(OutputFieldsDeclarer declarer);
     List<Column> getColumns(ITuple tuple);
-    public List<Values> toTuple(ITuple input, List<Column> columns);
+    List<Values> toTuple(ITuple input, List<Column> columns);
 ```
 
 The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
 tuple. 
+
 The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
 For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
 user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
 The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
-second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
-returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
-query, the second item to second `?` in query and so on. 
+second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.
+**Note: the order in the returned list determines the place holder's value. In other words the first item in the list maps 
+to first `?` in select query, the second item to second `?` in query and so on.** 
+
 The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
-and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
-of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
+and returns a list of values to be emitted. 
+**Please note that it returns a list of `Values` and not just a single instance of `Values`.** 
+This allows a for a single DB row to be mapped to multiple output storm tuples.
 
 ###SimpleJdbcLookupMapper
 `storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
@@ -113,7 +137,7 @@ that declares `user_id,user_name,create_date` as output fields and `user_id` as
 SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
 `SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
 select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
-and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
+and if it is not found in input tuple then it looks at select query's output row for a column with same name as field name. 
 So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
 `select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
 will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index f7be7ad..131da27 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -16,8 +16,11 @@
  * limitations under the License.
  */
 package org.apache.storm.jdbc.bolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Tuple;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
 import org.slf4j.Logger;
@@ -25,6 +28,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Basic bolt for writing to any Database table.
@@ -35,33 +39,53 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcInsertBolt.class);
 
     private String tableName;
+    private String insertQuery;
     private JdbcMapper jdbcMapper;
 
-    public JdbcInsertBolt(String configKey, String tableName, JdbcMapper jdbcMapper) {
+    public JdbcInsertBolt(String configKey, JdbcMapper jdbcMapper) {
         super(configKey);
-        this.tableName = tableName;
         this.jdbcMapper = jdbcMapper;
     }
 
+    public JdbcInsertBolt withTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public JdbcInsertBolt withInsertQuery(String insertQuery) {
+        this.insertQuery = insertQuery;
+        return this;
+    }
+
     public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
         this.queryTimeoutSecs = queryTimeoutSecs;
         return this;
     }
 
     @Override
+    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
+        super.prepare(map, topologyContext, collector);
+        if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
+            throw new IllegalArgumentException("You must supply either a tableName or an insert Query.");
+        }
+    }
+
+    @Override
     public void execute(Tuple tuple) {
         try {
             List<Column> columns = jdbcMapper.getColumns(tuple);
             List<List<Column>> columnLists = new ArrayList<List<Column>>();
             columnLists.add(columns);
-            this.jdbcClient.insert(this.tableName, columnLists);
+            if(!StringUtils.isBlank(tableName)) {
+                this.jdbcClient.insert(this.tableName, columnLists);
+            } else {
+                this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
+            }
+            this.collector.ack(tuple);
         } catch (Exception e) {
             this.collector.reportError(e);
             this.collector.fail(tuple);
-            return;
         }
-
-        this.collector.ack(tuple);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index c462c6e..c531fff 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -42,6 +42,7 @@ import java.io.Serializable;
  * rows.add(row2)
  *
  * </pre>
+ *
  * @param <T>
  */
 public class Column<T> implements Serializable {
@@ -50,7 +51,7 @@ public class Column<T> implements Serializable {
     private T val;
 
     /**
-     * The sql type(e.g. varchar, date, int) Idealy we would have an enum but java's jdbc API uses integer.
+     * The sql type(e.g. varchar, date, int) Ideally we would have an enum but java's jdbc API uses integer.
      * See {@link java.sql.Types}
      */
     private int sqlType;

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 4ad108c..63797f4 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -46,6 +46,11 @@ public class JdbcClient {
     }
 
     public void insert(String tableName, List<List<Column>> columnLists) {
+        String query = constructInsertQuery(tableName, columnLists);
+        executeInsertQuery(query, columnLists);
+    }
+
+    public void executeInsertQuery(String query, List<List<Column>> columnLists) {
         Connection connection = null;
         try {
             connection = this.dataSource.getConnection();
@@ -54,26 +59,13 @@ public class JdbcClient {
                 connection.setAutoCommit(false);
             }
 
-            StringBuilder sb = new StringBuilder();
-            sb.append("Insert into ").append(tableName).append(" (");
-            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
-                @Override
-                public String apply(Column input) {
-                    return input.getColumnName();
-                }
-            });
-            String columns = Joiner.on(",").join(columnNames);
-            sb.append(columns).append(") values ( ");
-
-            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
-            sb.append(placeHolders).append(")");
-
-            String query = sb.toString();
-
             LOG.debug("Executing query {}", query);
 
             PreparedStatement preparedStatement = connection.prepareStatement(query);
-            preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            if(queryTimeoutSecs > 0) {
+                preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            }
+
             for(List<Column> columnList : columnLists) {
                 setPreparedStatementParams(preparedStatement, columnList);
                 preparedStatement.addBatch();
@@ -87,22 +79,42 @@ public class JdbcClient {
                 try {
                     connection.commit();
                 } catch (SQLException e) {
-                    throw new RuntimeException("Failed to commit inserts in table " + tableName, e);
+                    throw new RuntimeException("Failed to commit insert query " + query, e);
                 }
             }
         } catch (SQLException e) {
-            throw new RuntimeException("Failed to insert in table " + tableName, e);
+            throw new RuntimeException("Failed to execute insert query " + query, e);
         } finally {
             closeConnection(connection);
         }
     }
 
+    private String constructInsertQuery(String tableName, List<List<Column>> columnLists) {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Insert into ").append(tableName).append(" (");
+        Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
+            @Override
+            public String apply(Column input) {
+                return input.getColumnName();
+            }
+        });
+        String columns = Joiner.on(",").join(columnNames);
+        sb.append(columns).append(") values ( ");
+
+        String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
+        sb.append(placeHolders).append(")");
+
+        return sb.toString();
+    }
+
     public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
         Connection connection = null;
         try {
             connection = this.dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
-            preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            if(queryTimeoutSecs > 0) {
+                preparedStatement.setQueryTimeout(queryTimeoutSecs);
+            }
             setPreparedStatementParams(preparedStatement, queryParams);
             ResultSet resultSet = preparedStatement.executeQuery();
             List<List<Column>> rows = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
index e2a7e8c..dca1f77 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
@@ -28,7 +28,7 @@ public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLook
                 values.add(input.getValueByField(field));
             } else {
                 for(Column column : columns) {
-                    if(column.getColumnName().equals(field)) {
+                    if(column.getColumnName().equalsIgnoreCase(field)) {
                         values.add(column.getVal());
                     }
                 }

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index 01da5cd..0f301f4 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -22,6 +22,7 @@ import backtype.storm.topology.FailedException;
 import backtype.storm.tuple.Values;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.Validate;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.JdbcClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
@@ -55,6 +56,7 @@ public class JdbcState implements State {
         private JdbcLookupMapper jdbcLookupMapper;
         private String configKey;
         private String tableName;
+        private String insertQuery;
         private String selectQuery;
         private Integer queryTimeoutSecs;
 
@@ -68,6 +70,11 @@ public class JdbcState implements State {
             return this;
         }
 
+        public Options withInsertQuery(String insertQuery) {
+            this.insertQuery = insertQuery;
+            return this;
+        }
+
         public Options withMapper(JdbcMapper mapper) {
             this.mapper = mapper;
             return this;
@@ -93,6 +100,11 @@ public class JdbcState implements State {
         Map<String, Object> conf = (Map<String, Object>) map.get(options.configKey);
         Validate.notEmpty(conf, "Hikari configuration not found using key '" + options.configKey + "'");
 
+        if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) {
+            throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." +
+                    "If you are attempting to user a query state you must supply a select query.");
+        }
+
         if(options.queryTimeoutSecs == null) {
             options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
         }
@@ -118,7 +130,11 @@ public class JdbcState implements State {
         }
 
         try {
-            jdbcClient.insert(options.tableName, columnsLists);
+            if(!StringUtils.isBlank(options.tableName)) {
+                jdbcClient.insert(options.tableName, columnsLists);
+            } else {
+                jdbcClient.executeInsertQuery(options.insertQuery, columnsLists);
+            }
         } catch (Exception e) {
             LOG.warn("Batch write failed but some requests might have succeeded. Triggering replay.", e);
             throw new FailedException(e);

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 787b887..5b3be88 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -51,36 +51,41 @@ public class JdbcClientTest {
 
     @Test
     public void testInsertAndSelect() {
-        int id1 = 1;
-        String name1 = "bob";
-        Timestamp createDate1 = new Timestamp(System.currentTimeMillis());
-
-        List<Column> row1 = Lists.newArrayList(
-                new Column("ID",id1, Types.INTEGER),
-                new Column("USER_NAME",name1, Types.VARCHAR),
-                new Column("CREATED_TIMESTAMP", createDate1 , Types.TIMESTAMP));
-
-        int id2 = 2;
-        String name2 = "alice";
-        Timestamp createDate2 = new Timestamp(System.currentTimeMillis());
-        List<Column> row2 = Lists.newArrayList(
-                new Column("ID",id2, Types.INTEGER),
-                new Column("USER_NAME",name2, Types.VARCHAR),
-                new Column("CREATED_TIMESTAMP", createDate2 , Types.TIMESTAMP));
+
+        List<Column> row1 = createRow(1, "bob");
+        List<Column> row2 = createRow(2, "alice");
 
         List<List<Column>> rows = Lists.newArrayList(row1, row2);
         client.insert(tableName, rows);
 
-        List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id1, Types.INTEGER)));
+        List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", 1, Types.INTEGER)));
         List<List<Column>> expectedRows = Lists.newArrayList();
         expectedRows.add(row1);
+        Assert.assertEquals(expectedRows, selectedRows);
+
+        List<Column> row3 = createRow(3, "frank");
+        List<List<Column>> moreRows  = new ArrayList<List<Column>>();
+        moreRows.add(row3);
+        client.executeInsertQuery("insert into user_details values(?,?,?)", moreRows);
 
+        selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", 3, Types.INTEGER)));
+        expectedRows = Lists.newArrayList();
+        expectedRows.add(row3);
         Assert.assertEquals(expectedRows, selectedRows);
 
+
         selectedRows = client.select("select * from user_details order by id", Lists.<Column>newArrayList());
+        rows.add(row3);
         Assert.assertEquals(rows, selectedRows);
     }
 
+    private List<Column> createRow(int id, String name) {
+        return Lists.newArrayList(
+                new Column("ID", id, Types.INTEGER),
+                new Column("USER_NAME", name, Types.VARCHAR),
+                new Column("CREATED_TIMESTAMP",  new Timestamp(System.currentTimeMillis()) , Types.TIMESTAMP));
+    }
+
     @After
     public void cleanup() {
         client.executeSql("drop table " + tableName);

http://git-wip-us.apache.org/repos/asf/storm/blob/148c9205/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 0b96f4d..7c529c8 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -35,7 +35,7 @@ public class UserPersistanceTopology extends AbstractUserTopology {
     @Override
     public StormTopology getTopology() {
         JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
-        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, TABLE_NAME, this.jdbcMapper);
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, this.jdbcMapper).withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();


[2/4] storm git commit: Merge branch 'STORM-699' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-699-V2

Posted by sr...@apache.org.
Merge branch 'STORM-699' of https://github.com/Parth-Brahmbhatt/incubator-storm into STORM-699-V2


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d37e747b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d37e747b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d37e747b

Branch: refs/heads/master
Commit: d37e747bc1edb161efeeeacb0d7c941ba9f032cc
Parents: 87a6627 148c920
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Mar 16 08:51:58 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Mar 16 08:51:58 2015 -0700

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 84 +++++++++++++-------
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 36 +++++++--
 .../org/apache/storm/jdbc/common/Column.java    |  3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    | 52 +++++++-----
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  2 +-
 .../storm/jdbc/trident/state/JdbcState.java     | 18 ++++-
 .../storm/jdbc/common/JdbcClientTest.java       | 39 +++++----
 .../jdbc/topology/UserPersistanceTopology.java  |  2 +-
 8 files changed, 159 insertions(+), 77 deletions(-)
----------------------------------------------------------------------



[3/4] storm git commit: Added STORM-699 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-699 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b49db05d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b49db05d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b49db05d

Branch: refs/heads/master
Commit: b49db05d35f6fd6988cd054fc594d96535fe7295
Parents: d37e747
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Mar 16 09:32:50 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Mar 16 09:32:50 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b49db05d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 1befac1..bc2229a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-699: storm-jdbc should support custom insert queries. 
  * STORM-625: Don't leak netty clients when worker moves or reuse netty client.	
  * STORM-682: supervisor should handle worker state corruption gracefully.
  * STORM-446: Allow superusers to impersonate other users in secure mode.


[4/4] storm git commit: Merge branch 'STORM-699-V2'

Posted by sr...@apache.org.
Merge branch 'STORM-699-V2'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/81ce9a1d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/81ce9a1d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/81ce9a1d

Branch: refs/heads/master
Commit: 81ce9a1d6778827591e7e38bb546767b4306bcee
Parents: 537b770 b49db05
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Mon Mar 16 09:33:19 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Mon Mar 16 09:33:19 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md                                    |  1 +
 external/storm-jdbc/README.md                   | 84 +++++++++++++-------
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 36 +++++++--
 .../org/apache/storm/jdbc/common/Column.java    |  3 +-
 .../apache/storm/jdbc/common/JdbcClient.java    | 52 +++++++-----
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  2 +-
 .../storm/jdbc/trident/state/JdbcState.java     | 18 ++++-
 .../storm/jdbc/common/JdbcClientTest.java       | 39 +++++----
 .../jdbc/topology/UserPersistanceTopology.java  |  2 +-
 9 files changed, 160 insertions(+), 77 deletions(-)
----------------------------------------------------------------------