You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:13 UTC
[18/21] storm git commit: STORM-616: Making all the required params
part of constructor args. changing executeUpdate to executeBatch and added
test case.
STORM-616: Making all the required params part of constructor args. changing executeUpdate to executeBatch and added test case.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04fccb1b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04fccb1b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04fccb1b
Branch: refs/heads/master
Commit: 04fccb1b152bdd3454adcfda8c4d71502ad2c6db
Parents: 017360b
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 10 15:17:39 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Feb 18 12:21:21 2015 -0800
----------------------------------------------------------------------
external/storm-jdbc/README.md | 13 ++---
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 10 +---
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 12 +----
.../apache/storm/jdbc/common/JdbcClient.java | 25 +++++++--
.../storm/jdbc/mapper/JdbcLookupMapper.java | 2 +-
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 4 +-
.../storm/jdbc/common/JdbcClientTest.java | 55 ++++++++++----------
.../jdbc/topology/AbstractUserTopology.java | 3 ++
.../jdbc/topology/UserPersistanceTopology.java | 8 +--
9 files changed, 64 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index cfe449d..6fb1d41 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -63,9 +63,7 @@ to be <= topology.message.timeout.secs.
```java
Config config = new Config();
config.put("jdbc.conf", hikariConfigMap);
-JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf")
- .withTableName("user_details")
- .withJdbcMapper(simpleJdbcMapper)
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf","user_details",simpleJdbcMapper)
.withQueryTimeoutSecs(30);
```
### JdbcTridentState
@@ -135,9 +133,9 @@ You can optionally specify a query timeout seconds param that specifies max seco
The default is set to value of topology.message.timeout.secs. You should set this value to be <= topology.message.timeout.secs.
```java
-JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
- .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
- .withSelectSql("select user_name from user_details where user_id = ?")
+String selectSql = "select user_name from user_details where user_id = ?";
+SimpleJdbcLookupMapper lookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns)
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf", selectSql, lookupMapper)
.withQueryTimeoutSecs(30);
```
@@ -208,8 +206,7 @@ mvn clean compile assembly:single.
Mysql Example:
```
-storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar
-org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
+storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar org.apache.storm.jdbc.topology.UserPersistanceTopology com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
```
You can execute a select query against the user table which should show newly inserted rows:
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 9abd553..f7be7ad 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -37,18 +37,10 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
private String tableName;
private JdbcMapper jdbcMapper;
- public JdbcInsertBolt(String configKey) {
+ public JdbcInsertBolt(String configKey, String tableName, JdbcMapper jdbcMapper) {
super(configKey);
- }
-
- public JdbcInsertBolt withTableName(String tableName) {
this.tableName = tableName;
- return this;
- }
-
- public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
this.jdbcMapper = jdbcMapper;
- return this;
}
public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 8232c2f..e1b1553 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,18 +37,10 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
private JdbcLookupMapper jdbcLookupMapper;
- public JdbcLookupBolt(String configKey) {
+ public JdbcLookupBolt(String configKey, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
super(configKey);
- }
-
- public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
- this.jdbcLookupMapper = jdbcLookupMapper;
- return this;
- }
-
- public JdbcLookupBolt withSelectSql(String selectQuery) {
this.selectQuery = selectQuery;
- return this;
+ this.jdbcLookupMapper = jdbcLookupMapper;
}
public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index 4992ed7..4ad108c 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -37,18 +37,23 @@ public class JdbcClient {
private HikariDataSource dataSource;
private int queryTimeoutSecs;
- public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
+ public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
Properties properties = new Properties();
- properties.putAll(map);
+ properties.putAll(hikariConfigMap);
HikariConfig config = new HikariConfig(properties);
this.dataSource = new HikariDataSource(config);
this.queryTimeoutSecs = queryTimeoutSecs;
}
- public int insert(String tableName, List<List<Column>> columnLists) {
+ public void insert(String tableName, List<List<Column>> columnLists) {
Connection connection = null;
try {
connection = this.dataSource.getConnection();
+ boolean autoCommit = connection.getAutoCommit();
+ if(autoCommit) {
+ connection.setAutoCommit(false);
+ }
+
StringBuilder sb = new StringBuilder();
sb.append("Insert into ").append(tableName).append(" (");
Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
@@ -67,14 +72,24 @@ public class JdbcClient {
LOG.debug("Executing query {}", query);
-
PreparedStatement preparedStatement = connection.prepareStatement(query);
preparedStatement.setQueryTimeout(queryTimeoutSecs);
for(List<Column> columnList : columnLists) {
setPreparedStatementParams(preparedStatement, columnList);
+ preparedStatement.addBatch();
}
- return preparedStatement.executeUpdate();
+ int[] results = preparedStatement.executeBatch();
+ if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
+ connection.rollback();
+ throw new RuntimeException("failed at least one sql statement in the batch, operation rolled back.");
+ } else {
+ try {
+ connection.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to commit inserts in table " + tableName, e);
+ }
+ }
} catch (SQLException e) {
throw new RuntimeException("Failed to insert in table " + tableName, e);
} finally {
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
index 77852f4..f8c79a3 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -10,7 +10,7 @@ import java.util.List;
public interface JdbcLookupMapper extends JdbcMapper {
/**
- * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+ * Converts a DB row to a list of storm values that can be emitted. This is done to allow a single
* storm input tuple and a single DB row to result in multiple output values.
* @param input the input tuple.
* @param columns list of columns that represents a row
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index ad7f1c0..841d5d6 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -33,9 +33,9 @@ public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;
- public SimpleJdbcMapper(String tableName, Map map) {
+ public SimpleJdbcMapper(String tableName, Map hikariConfigurationMap) {
int queryTimeoutSecs = 30;
- JdbcClient client = new JdbcClient(map, queryTimeoutSecs);
+ JdbcClient client = new JdbcClient(hikariConfigurationMap, queryTimeoutSecs);
this.schemaColumns = client.getColumnSchema(tableName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index 6423e8f..787b887 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -24,7 +24,8 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.Date;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@@ -45,39 +46,39 @@ public class JdbcClientTest {
int queryTimeoutSecs = 60;
this.client = new JdbcClient(map, queryTimeoutSecs);
- client.executeSql("create table user_details (id integer, user_name varchar(100), create_date date)");
+ client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
}
@Test
public void testInsertAndSelect() {
- int id = 1;
- String name = "bob";
- Date createDate = new Date(System.currentTimeMillis());
+ int id1 = 1;
+ String name1 = "bob";
+ Timestamp createDate1 = new Timestamp(System.currentTimeMillis());
- List<Column> columns = Lists.newArrayList(
- new Column("id",id, Types.INTEGER),
- new Column("user_name",name, Types.VARCHAR),
- new Column("create_date", createDate , Types.DATE)
- );
+ List<Column> row1 = Lists.newArrayList(
+ new Column("ID",id1, Types.INTEGER),
+ new Column("USER_NAME",name1, Types.VARCHAR),
+ new Column("CREATED_TIMESTAMP", createDate1 , Types.TIMESTAMP));
- List<List<Column>> columnList = new ArrayList<List<Column>>();
- columnList.add(columns);
- client.insert(tableName, columnList);
+ int id2 = 2;
+ String name2 = "alice";
+ Timestamp createDate2 = new Timestamp(System.currentTimeMillis());
+ List<Column> row2 = Lists.newArrayList(
+ new Column("ID",id2, Types.INTEGER),
+ new Column("USER_NAME",name2, Types.VARCHAR),
+ new Column("CREATED_TIMESTAMP", createDate2 , Types.TIMESTAMP));
- List<List<Column>> rows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id, Types.INTEGER)));
- for(List<Column> row : rows) {
- for(Column column : row) {
- if(column.getColumnName().equalsIgnoreCase("id")) {
- Assert.assertEquals(id, column.getVal());
- } else if(column.getColumnName().equalsIgnoreCase("user_name")) {
- Assert.assertEquals(name, column.getVal());
- } else if(column.getColumnName().equalsIgnoreCase("create_date")) {
- Assert.assertEquals(createDate.toString(), column.getVal().toString());
- } else {
- throw new AssertionError("Unknown column" + column);
- }
- }
- }
+ List<List<Column>> rows = Lists.newArrayList(row1, row2);
+ client.insert(tableName, rows);
+
+ List<List<Column>> selectedRows = client.select("select * from user_details where id = ?", Lists.newArrayList(new Column("id", id1, Types.INTEGER)));
+ List<List<Column>> expectedRows = Lists.newArrayList();
+ expectedRows.add(row1);
+
+ Assert.assertEquals(expectedRows, selectedRows);
+
+ selectedRows = client.select("select * from user_details order by id", Lists.<Column>newArrayList());
+ Assert.assertEquals(rows, selectedRows);
}
@After
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 9cb0bfa..e94aca2 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -38,6 +38,9 @@ import java.util.Map;
public abstract class AbstractUserTopology {
private static final List<String> setupSqls = Lists.newArrayList(
+ "drop table if exists user",
+ "drop table if exists department",
+ "drop table if exists user_department",
"create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
"create table if not exists department (dept_id integer, dept_name varchar(100))",
"create table if not exists user_department (user_id integer, dept_id integer)",
http://git-wip-us.apache.org/repos/asf/storm/blob/04fccb1b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 32c012e..0b96f4d 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -34,12 +34,8 @@ public class UserPersistanceTopology extends AbstractUserTopology {
@Override
public StormTopology getTopology() {
- JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
- .withJdbcLookupMapper(this.jdbcLookupMapper)
- .withSelectSql(SELECT_QUERY);
- JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF)
- .withTableName(TABLE_NAME)
- .withJdbcMapper(this.jdbcMapper);
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF, SELECT_QUERY, this.jdbcLookupMapper);
+ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF, TABLE_NAME, this.jdbcMapper);
// userSpout ==> jdbcBolt
TopologyBuilder builder = new TopologyBuilder();