You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:13 UTC

[18/21] storm git commit: STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.

STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04fccb1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04fccb1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04fccb1b

Branch: refs/heads/master
Commit: 04fccb1b152bdd3454adcfda8c4d71502ad2c6db
Parents: 017360b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 10 15:17:39 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Feb 18 12:21:21 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 13 ++---
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 10 +---
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  | 12 +----
 .../apache/storm/jdbc/common/JdbcClient.java    | 25 +++++++--
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |  2 +-
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  4 +-
 .../storm/jdbc/common/JdbcClientTest.java       | 55 ++++++++++----------
 .../jdbc/topology/AbstractUserTopology.java     |  3 ++
 .../jdbc/topology/UserPersistanceTopology.java  |  8 +--
 9 files changed, 64 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index cfe449d..6fb1d41 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -63,9 +63,7 @@ to be <= topology.message.timeout.secs.
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf")
-                                    .withTableName("user_details")
-                                    .withJdbcMapper(simpleJdbcMapper)
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf","user_details",simpleJdbcMapper)
                                     .withQueryTimeoutSecs(30);
  ```
 ### JdbcTridentState
@@ -135,9 +133,9 @@ You can optionally specify a query timeout seconds param that specifies max seco
 The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
 
 ```java
-JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
-        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
-        .withSelectSql("select user_name from user_details where user_id = ?")
+String selectSql = "select user_name from user_details where user_id = ?";
+SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
         .withQueryTimeoutSecs(30);
 ```
 
@@ -208,8 +206,7 @@ mvn clean compile assembly:single.
 
 Mysql Example:
 ```
-storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
-org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```
 
 You can execute a select query against the user table which should show newly inserted rows:

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 9abd553..f7be7ad 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -37,18 +37,10 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
     private String tableName;
     private JdbcMapper jdbcMapper;
 
-    public JdbcInsertBolt(String configKey) {
+    public JdbcInsertBolt(String configKey, String tableName, JdbcMapper jdbcMapper) {
         super(configKey);
-    }
-
-    public JdbcInsertBolt withTableName(String tableName) {
         this.tableName = tableName;
-        return this;
-    }
-
-    public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
         this.jdbcMapper = jdbcMapper;
-        return this;
     }
 
     public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 8232c2f..e1b1553 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,18 +37,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt(String configKey) {
+    public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
         super(configKey);
-    }
-
-    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
-        this.jdbcLookupMapper = jdbcLookupMapper;
-        return this;
-    }
-
-    public JdbcLookupBolt withSelectSql(String selectQuery) {
         this.selectQuery = selectQuery;
-        return this;
+        this.jdbcLookupMapper = jdbcLookupMapper;
     }
 
     public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 4992ed7..4ad108c 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -37,18 +37,23 @@ public class JdbcClient {
     private HikariDataSource dataSource;
     private int queryTimeoutSecs;
 
-    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
+    public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
         Properties properties = new Properties();
-        properties.putAll(map);
+        properties.putAll(hikariConfigMap);
         HikariConfig config = new HikariConfig(properties);
         this.dataSource = new HikariDataSource(config);
         this.queryTimeoutSecs = queryTimeoutSecs;
     }
 
-    public int insert(String tableName, List<List<Column>> columnLists) {
+    public void insert(String tableName, List<List<Column>> columnLists) {
         Connection connection = null;
         try {
             connection = this.dataSource.getConnection();
+            boolean autoCommit = connection.getAutoCommit();
+            if(autoCommit) {
+                connection.setAutoCommit(false);
+            }
+
             StringBuilder sb = new StringBuilder();
             sb.append("Insert into ").append(tableName).append(" (");
             Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
@@ -67,14 +72,24 @@ public class JdbcClient {
 
             LOG.debug("Executing query {}", query);
 
-
             PreparedStatement preparedStatement = connection.prepareStatement(query);
             preparedStatement.setQueryTimeout(queryTimeoutSecs);
             for(List<Column> columnList : columnLists) {
                 setPreparedStatementParams(preparedStatement, columnList);
+                preparedStatement.addBatch();
             }
 
-            return preparedStatement.executeUpdate();
+            int[] results = preparedStatement.executeBatch();
+            if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
+                connection.rollback();
+                throw new RuntimeException("failed at least one sql statement in the batch, operation rolled back.");
+            } else {
+                try {
+                    connection.commit();
+                } catch (SQLException e) {
+                    throw new RuntimeException("Failed to commit inserts in table " + tableName, e);
+                }
+            }
         } catch (SQLException e) {
             throw new RuntimeException("Failed to insert in table " + tableName, e);
         } finally {

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
index 77852f4..f8c79a3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -10,7 +10,7 @@ import java.util.List;
 public interface JdbcLookupMapper extends JdbcMapper {
 
     /**
-     * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+     * Converts a DB row to a list of storm values that can be emitted. This is done to allow a single
      * storm input tuple and a single DB row to result in multiple output values.
      * @param input the input tuple.
      * @param columns list of columns that represents a row

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index ad7f1c0..841d5d6 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -33,9 +33,9 @@ public class SimpleJdbcMapper implements JdbcMapper {
 
     private List<Column> schemaColumns;
 
-    public SimpleJdbcMapper(String tableName, Map map) {
+    public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) {
         int queryTimeoutSecs = 30;
-        JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
+        JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs);
         this.schemaColumns = client.getColumnSchema(tableName);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 6423e8f..787b887 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -24,7 +24,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Date;
 import java.sql.Types;
 import java.util.ArrayList;
 import java.util.List;
@@ -45,39 +46,39 @@ public class JdbcClientTest {
 
         int queryTimeoutSecs = 60;
         this.client = new JdbcClient(map, queryTimeoutSecs);
-        client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
+        client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
     }
 
     @Test
     public void testInsertAndSelect() {
-        int id = 1;
-        String name = "bob";
-        Date createDate = new Date(System.currentTimeMillis());
+        int id1 = 1;
+        String name1 = "bob";
+        Timestamp createDate1 = new Timestamp(System.currentTimeMillis());
 
-        List<Column> columns = Lists.newArrayList(
-                new Column("id",id, Types.INTEGER),
-                new Column("user_name",name, Types.VARCHAR),
-                new Column("create_date", createDate , Types.DATE)
-                );
+        List<Column> row1 = Lists.newArrayList(
+                new Column("ID",id1, Types.INTEGER),
+                new Column("USER_NAME",name1, Types.VARCHAR),
+                new Column("CREATED_TIMESTAMP", createDate1 , Types.TIMESTAMP));
 
-        List<List<Column>> columnList = new ArrayList<List<Column>>();
-        columnList.add(columns);
-        client.insert(tableName, columnList);
+        int id2 = 2;
+        String name2 = "alice";
+        Timestamp createDate2 = new Timestamp(System.currentTimeMillis());
+        List<Column> row2 = Lists.newArrayList(
+                new Column("ID",id2, Types.INTEGER),
+                new Column("USER_NAME",name2, Types.VARCHAR),
+                new Column("CREATED_TIMESTAMP", createDate2 , Types.TIMESTAMP));
 
-        List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER)));
-        for(List<Column> row : rows) {
-            for(Column column : row) {
-                if(column.getColumnName().equalsIgnoreCase("id")) {
-                    Assert.assertEquals(id, column.getVal());
-                } else if(column.getColumnName().equalsIgnoreCase("user_name")) {
-                    Assert.assertEquals(name, column.getVal());
-                } else if(column.getColumnName().equalsIgnoreCase("create_date")) {
-                    Assert.assertEquals(createDate.toString(), column.getVal().toString());
-                } else {
-                    throw new AssertionError("Unknown column" + column);
-                }
-            }
-        }
+        List<List<Column>> rows = Lists.newArrayList(row1, row2);
+        client.insert(tableName, rows);
+
+        List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id1, Types.INTEGER)));
+        List<List<Column>> expectedRows = Lists.newArrayList();
+        expectedRows.add(row1);
+
+        Assert.assertEquals(expectedRows, selectedRows);
+
+        selectedRows = client.select("select * from user_details order by id", Lists.<Column>newArrayList());
+        Assert.assertEquals(rows, selectedRows);
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 9cb0bfa..e94aca2 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -38,6 +38,9 @@ import java.util.Map;
 
 public abstract class AbstractUserTopology {
     private static final List<String> setupSqls = Lists.newArrayList(
+            "drop table if exists user",
+            "drop table if exists department",
+            "drop table if exists user_department",
             "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
             "create table if not exists department (dept_id integer, dept_name varchar(100))",
             "create table if not exists user_department (user_id integer, dept_id integer)",

http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 32c012e..0b96f4d 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,12 +34,8 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
-                .withJdbcLookupMapper(this.jdbcLookupMapper)
-                .withSelectSql(SELECT_QUERY);
-        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF)
-                .withTableName(TABLE_NAME)
-                .withJdbcMapper(this.jdbcMapper);
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, TABLE_NAME, this.jdbcMapper);
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();