You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:02 UTC

[07/21] storm git commit: STORM-616: adding jdbc Lookup bolt.

STORM-616: adding jdbc Lookup bolt.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bfa6028
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bfa6028
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bfa6028

Branch: refs/heads/master
Commit: 8bfa602876f84985864f136ee578bd2d9edb9ba7
Parents: cd96dd0
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 7 17:57:35 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 7 18:11:49 2015 -0500

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 148 +++++++++++++++----
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  10 +-
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    |  18 ++-
 .../org/apache/storm/jdbc/common/Column.java    |   8 +-
 .../apache/storm/jdbc/common/JDBCClient.java    |  54 ++++---
 .../storm/jdbc/mapper/JdbcLookupMapper.java     |  26 ++++
 .../jdbc/mapper/SimpleJdbcLookupMapper.java     |  46 ++++++
 .../storm/jdbc/mapper/SimpleJdbcMapper.java     |  14 +-
 .../storm/jdbc/trident/state/JdbcQuery.java     |  40 +++++
 .../storm/jdbc/trident/state/JdbcState.java     |  33 +++++
 .../org/apache/storm/jdbc/spout/UserSpout.java  |   2 +-
 .../jdbc/topology/AbstractUserTopology.java     | 102 +++++++++++++
 .../jdbc/topology/UserPersistanceTopology.java  |  64 +++-----
 .../UserPersistanceTridentTopology.java         |  61 +++-----
 14 files changed, 466 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index a0273f2..bb43687 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -1,10 +1,11 @@
-#Storm HBase
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
+in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
 
-Storm/Trident integration for JDBC.
-
-## Usage
-The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper`
-interface:
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
 
 ```java
 public interface JdbcMapper  extends Serializable {
@@ -16,7 +17,7 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
 
 ### SimpleJdbcMapper
 `storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
-tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in 
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
 the database table that you intend to write to.
 
 To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
@@ -25,8 +26,8 @@ The following code creates a `SimpleJdbcMapper` instance that:
 
 1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
 2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
-automatically figure out the column names of the table that you intend to write to. 
-Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to lear more about hikari configuration properties.
+automatically figure out the column names and corresponding data types of the table that you intend to write to. 
+Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
 
 ```java
 Map hikariConfigMap = Maps.newHashMap();
@@ -35,49 +36,138 @@ hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
 hikariConfigMap.put("dataSource.user","root");
 hikariConfigMap.put("dataSource.password","password");
 String tableName = "user_details";
-JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
+If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
+and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
+`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
+you can initialize the `jdbcMapper` as below:
+
+```java
+List<Column> columnSchema = Lists.newArrayList(
+    new Column("user_id", java.sql.Types.INTEGER),
+    new Column("user_name", java.sql.Types.VARCHAR));
+    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
+
 ### JdbcBolt
-To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition
-you must specify a configuration key that hold the hikari configuration map.
+To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
+the rows will be inserted.
 
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-
-JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper)
-        .withConfigKey("jdbc.conf");
+JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+                                    .withTableName("user_details")
+                                    .withJdbcMapper(simpleJdbcMapper);
  ```
 ### JdbcTridentState
 We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
-state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example
-below:
+state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
+hikari configuration map. See the example below:
 
 ```java
 JdbcState.Options options = new JdbcState.Options()
         .withConfigKey("jdbc.conf")
         .withMapper(jdbcMapper)
-        .withTableName("user");
+        .withTableName("user_details");
 
 JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
 ```
- 
-## Example: Persistent User details
-A runnable example can be found in the `src/test/java/topology` directory.
 
-### Setup
-* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
-* Start the database and login to the database.
-* Create table user using the following query:
+## Lookup from Database
+We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
+executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
 
+```java
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+    List<Column> getColumns(ITuple tuple);
+    public List<Values> toTuple(ITuple input, List<Column> columns);
 ```
-> use test;
-> create table user (id integer, user_name varchar(100), create_date date);
+
+The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
+tuple. 
+The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
+user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
+The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
+second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
+returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
+query, the second item to second `?` in query and so on. 
+The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
+and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
+of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
+
+###SimpleJdbcLookupMapper
+`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
+
+To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
+that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
+`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
+select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
+and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
+So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
+`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
+will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
+For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
+is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
+
+```java
+Fields outputFields = new Fields("user_id", "user_name", "create_date");
+List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
 ```
 
+### JdbcLookupBolt
+To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+
+```java
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
+        .withSelectSql("select user_name from user_details where user_id = ?")
+```
+
+### JdbcTridentState for lookup
+We also support a trident query state that can be used with trident topologies. 
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+        .withConfigKey("jdbc.conf")
+        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+        .withSelectQuery("select user_name from user_details where user_id = ?");
+```
+
+## Example:
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
+* The test topologies executes the following queries so your intended DB must support these queries for test topologies
+to work. 
+```SQL
+create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, 'R&D');
+insert into department values (2, 'Finance');
+insert into department values (3, 'HR');
+insert into department values (4, 'Sales');
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
+```
 ### Execution
 Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
-storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name]
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
 
 Mysql Example:
 ```
@@ -86,7 +176,7 @@ org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jd
 jdbc:mysql://localhost/test root password user UserPersistenceTopology
 ```
 
-You can execute a select query against the user table which shoule show newly inserted rows:
+You can execute a select query against the user table which should show newly inserted rows:
 
 ```
 select * from user;

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 8dacc2d..1e717eb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,15 +34,11 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
     protected OutputCollector collector;
 
     protected transient JDBCClient jdbcClient;
-    protected String tableName;
-    protected JdbcMapper mapper;
     protected String configKey;
 
-    public AbstractJdbcBolt(String tableName, JdbcMapper mapper) {
-        Validate.notEmpty(tableName, "Table name can not be blank or null");
-        Validate.notNull(mapper, "mapper can not be null");
-        this.tableName = tableName;
-        this.mapper = mapper;
+    public AbstractJdbcBolt(String configKey) {
+        Validate.notEmpty(configKey, "configKey can not be null");
+        this.configKey = configKey;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index e5df1ae..d4ddfcb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -51,21 +51,27 @@ import java.util.List;
 public class JdbcBolt extends AbstractJdbcBolt {
     private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
 
-    boolean writeToWAL = true;
+    private String tableName;
+    private JdbcMapper jdbcMapper;
 
-    public JdbcBolt(String tableName, JdbcMapper mapper) {
-        super(tableName, mapper);
+    public JdbcBolt(String configKey) {
+        super(configKey);
     }
 
-    public JdbcBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
+    public JdbcBolt withTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+        this.jdbcMapper = jdbcMapper;
         return this;
     }
 
     @Override
     public void execute(Tuple tuple) {
         try {
-            List<Column> columns = mapper.getColumns(tuple);
+            List<Column> columns = jdbcMapper.getColumns(tuple);
             List<List<Column>> columnLists = new ArrayList<List<Column>>();
             columnLists.add(columns);
             this.jdbcClient.insert(this.tableName, columnLists);

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index 0346bf7..4c5b37d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -18,13 +18,14 @@
 package org.apache.storm.jdbc.common;
 
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.sql.Date;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 
-public class Column<T> {
+public class Column<T> implements Serializable {
 
     private String columnName;
     private T val;
@@ -36,6 +37,11 @@ public class Column<T> {
         this.sqlType = sqlType;
     }
 
+    public Column(String columnName, int sqlType) {
+        this.columnName = columnName;
+        this.sqlType = sqlType;
+    }
+
     public String getColumnName() {
         return columnName;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index 5b63d2d..410c884 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -81,7 +81,6 @@ public class JDBCClient {
 
     public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
         Connection connection = null;
-        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
         try {
             connection = this.dataSource.getConnection();
             PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
@@ -95,29 +94,28 @@ public class JDBCClient {
                 for(int i=1 ; i <= columnCount; i++) {
                     String columnLabel = metaData.getColumnLabel(i);
                     int columnType = metaData.getColumnType(i);
-                    Object val = null;
                     Class columnJavaType = Util.getJavaType(columnType);
-                    if (columnJavaType == String.class) {
+                    if (columnJavaType.equals(String.class)) {
                         row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
-                    } else if (columnJavaType == Integer.class) {
+                    } else if (columnJavaType.equals(Integer.class)) {
                         row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
-                    } else if (columnJavaType == Double.class) {
+                    } else if (columnJavaType.equals(Double.class)) {
                         row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
-                    } else if (columnJavaType == Float.class) {
+                    } else if (columnJavaType.equals(Float.class)) {
                         row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
-                    } else if (columnJavaType == Short.class) {
+                    } else if (columnJavaType.equals(Short.class)) {
                         row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
-                    } else if (columnJavaType == Boolean.class) {
+                    } else if (columnJavaType.equals(Boolean.class)) {
                         row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
-                    } else if (columnJavaType == byte[].class) {
+                    } else if (columnJavaType.equals(byte[].class)) {
                         row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
-                    } else if (columnJavaType == Long.class) {
+                    } else if (columnJavaType.equals(Long.class)) {
                         row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
-                    } else if (columnJavaType == Date.class) {
+                    } else if (columnJavaType.equals(Date.class)) {
                         row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
-                    } else if (columnJavaType == Time.class) {
+                    } else if (columnJavaType.equals(Time.class)) {
                         row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
-                    } else if (columnJavaType == Timestamp.class) {
+                    } else if (columnJavaType.equals(Timestamp.class)) {
                         row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
                     } else {
                         throw new RuntimeException("type =  " + columnType + " for column " + columnLabel + " not supported.");
@@ -133,17 +131,17 @@ public class JDBCClient {
         }
     }
 
-    public Map<String, Integer> getColumnSchema(String tableName) {
+    public List<Column> getColumnSchema(String tableName) {
         Connection connection = null;
-        Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+        List<Column> columns = new ArrayList<Column>();
         try {
             connection = this.dataSource.getConnection();
             DatabaseMetaData metaData = connection.getMetaData();
             ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
             while (resultSet.next()) {
-                columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"));
+                columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")));
             }
-            return columnSchemaMap;
+            return columns;
         } catch (SQLException e) {
             throw new RuntimeException("Failed to get schema for table " + tableName, e);
         } finally {
@@ -170,27 +168,27 @@ public class JDBCClient {
             Class columnJavaType = Util.getJavaType(column.getSqlType());
             if (column.getVal() == null) {
                 preparedStatement.setNull(index, column.getSqlType());
-            } else if (columnJavaType == String.class) {
+            } else if (columnJavaType.equals(String.class)) {
                 preparedStatement.setString(index, (String) column.getVal());
-            } else if (columnJavaType == Integer.class) {
+            } else if (columnJavaType.equals(Integer.class)) {
                 preparedStatement.setInt(index, (Integer) column.getVal());
-            } else if (columnJavaType == Double.class) {
+            } else if (columnJavaType.equals(Double.class)) {
                 preparedStatement.setDouble(index, (Double) column.getVal());
-            } else if (columnJavaType == Float.class) {
+            } else if (columnJavaType.equals(Float.class)) {
                 preparedStatement.setFloat(index, (Float) column.getVal());
-            } else if (columnJavaType == Short.class) {
+            } else if (columnJavaType.equals(Short.class)) {
                 preparedStatement.setShort(index, (Short) column.getVal());
-            } else if (columnJavaType == Boolean.class) {
+            } else if (columnJavaType.equals(Boolean.class)) {
                 preparedStatement.setBoolean(index, (Boolean) column.getVal());
-            } else if (columnJavaType == byte[].class) {
+            } else if (columnJavaType.equals(byte[].class)) {
                 preparedStatement.setBytes(index, (byte[]) column.getVal());
-            } else if (columnJavaType == Long.class) {
+            } else if (columnJavaType.equals(Long.class)) {
                 preparedStatement.setLong(index, (Long) column.getVal());
-            } else if (columnJavaType == Date.class) {
+            } else if (columnJavaType.equals(Date.class)) {
                 preparedStatement.setDate(index, (Date) column.getVal());
-            } else if (columnJavaType == Time.class) {
+            } else if (columnJavaType.equals(Time.class)) {
                 preparedStatement.setTime(index, (Time) column.getVal());
-            } else if (columnJavaType == Timestamp.class) {
+            } else if (columnJavaType.equals(Timestamp.class)) {
                 preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
             } else {
                 throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
new file mode 100644
index 0000000..77852f4
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.List;
+
+public interface JdbcLookupMapper extends JdbcMapper {
+
+    /**
+     * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+     * storm input tuple and a single DB row to result in multiple output values.
+     * @param input the input tuple.
+     * @param columns list of columns that represents a row
+     * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+     */
+    public List<Values> toTuple(ITuple input, List<Column> columns);
+
+    /**
+     * declare what are the fields that this code will output.
+     * @param declarer
+     */
+    void declareOutputFields(OutputFieldsDeclarer declarer);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
new file mode 100644
index 0000000..e2a7e8c
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
@@ -0,0 +1,46 @@
+package org.apache.storm.jdbc.mapper;
+
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLookupMapper {
+
+    private Fields outputFields;
+
+    public SimpleJdbcLookupMapper(Fields outputFields, List<Column> queryColumns) {
+        super(queryColumns);
+        this.outputFields = outputFields;
+    }
+
+    @Override
+    public List<Values> toTuple(ITuple input, List<Column> columns) {
+        Values values = new Values();
+
+        for(String field : outputFields) {
+            if(input.contains(field)) {
+                values.add(input.getValueByField(field));
+            } else {
+                for(Column column : columns) {
+                    if(column.getColumnName().equals(field)) {
+                        values.add(column.getVal());
+                    }
+                }
+            }
+        }
+        List<Values> result = new ArrayList<Values>();
+        result.add(values);
+        return result;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(outputFields);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 7011a72..df25695 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -31,19 +31,23 @@ import java.util.Map;
 
 public class SimpleJdbcMapper implements JdbcMapper {
 
-    private Map<String, Integer> columnNameToType;
+    private List<Column> schemaColumns;
 
     public SimpleJdbcMapper(String tableName, Map map) {
         JDBCClient client = new JDBCClient(map);
-        this.columnNameToType = client.getColumnSchema(tableName);
+        this.schemaColumns = client.getColumnSchema(tableName);
+    }
+
+    public SimpleJdbcMapper(List<Column> schemaColumns) {
+        this.schemaColumns = schemaColumns;
     }
 
     @Override
     public List<Column> getColumns(ITuple tuple) {
         List<Column> columns = new ArrayList<Column>();
-        for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) {
-            String columnName = entry.getKey();
-            Integer columnSqlType = entry.getValue();
+        for(Column column : schemaColumns) {
+            String columnName = column.getColumnName();
+            Integer columnSqlType = column.getSqlType();
 
             if(Util.getJavaType(columnSqlType).equals(String.class)) {
                 String value = tuple.getStringByField(columnName);

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
new file mode 100644
index 0000000..ad39f4b
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class JdbcQuery extends BaseQueryFunction<JdbcState, List<Values>> {
+
+    @Override
+    public List<List<Values>> batchRetrieve(JdbcState jdbcState, List<TridentTuple> tridentTuples) {
+        return jdbcState.batchRetrieve(tridentTuples);
+    }
+
+    @Override
+    public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
+        for (Values value : values) {
+            tridentCollector.emit(value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index fec2ee4..6b4e79a 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -18,10 +18,13 @@
 package org.apache.storm.jdbc.trident.state;
 
 import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang.Validate;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.JDBCClient;
 import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.trident.operation.TridentCollector;
@@ -48,8 +51,10 @@ public class JdbcState implements State {
 
     public static class Options implements Serializable {
         private JdbcMapper mapper;
+        private JdbcLookupMapper jdbcLookupMapper;
         private String configKey;
         private String tableName;
+        private String selectQuery;
 
         public Options withConfigKey(String configKey) {
             this.configKey = configKey;
@@ -65,6 +70,16 @@ public class JdbcState implements State {
             this.mapper = mapper;
             return this;
         }
+
+        public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
+            this.jdbcLookupMapper = jdbcLookupMapper;
+            return this;
+        }
+
+        public Options withSelectQuery(String selectQuery) {
+            this.selectQuery = selectQuery;
+            return this;
+        }
     }
 
     protected void prepare() {
@@ -98,4 +113,22 @@ public class JdbcState implements State {
             throw new FailedException(e);
         }
     }
+
+    public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+        List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+        try {
+            for (TridentTuple tuple : tridentTuples) {
+                List<Column> columns = options.jdbcLookupMapper.getColumns(tuple);
+                List<List<Column>> rows = jdbcClient.select(options.selectQuery, columns);
+                for(List<Column> row : rows) {
+                    List<Values> values = options.jdbcLookupMapper.toTuple(tuple, row);
+                    batchRetrieveResult.add(values);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Batch get operation failed. Triggering replay.", e);
+            throw new FailedException(e);
+        }
+        return batchRetrieveResult;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
index 39fde59..718917a 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -72,7 +72,7 @@ public class UserSpout implements IRichSpout {
     }
 
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("id","user_name","create_date"));
+        declarer.declare(new Fields("user_id","user_name","create_date"));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
new file mode 100644
index 0000000..700f83e
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import backtype.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+    private static final List<String> setupSqls = Lists.newArrayList(
+            "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
+            "create table if not exists department (dept_id integer, dept_name varchar(100))",
+            "create table if not exists user_department (user_id integer, dept_id integer)",
+            "insert into department values (1, 'R&D')",
+            "insert into department values (2, 'Finance')",
+            "insert into department values (3, 'HR')",
+            "insert into department values (4, 'Sales')",
+            "insert into user_department values (1, 1)",
+            "insert into user_department values (2, 2)",
+            "insert into user_department values (3, 3)",
+            "insert into user_department values (4, 4)"
+    );
+    protected UserSpout userSpout;
+    protected JdbcMapper jdbcMapper;
+    protected JdbcLookupMapper jdbcLookupMapper;
+
+    protected static final String TABLE_NAME = "user";
+    protected static final String JDBC_CONF = "jdbc.conf";
+    protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
+            " and user_department.user_id = ?";
+
+    public void execute(String[] args) throws Exception {
+        if (args.length != 4 && args.length != 5) {
+            System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
+                    + "<user> <password> [topology name]");
+            System.exit(-1);
+        }
+        Map map = Maps.newHashMap();
+        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+        map.put("dataSource.user", args[2]);//root
+        map.put("dataSource.password", args[3]);//password
+
+        Config config = new Config();
+        config.put(JDBC_CONF, map);
+
+        JDBCClient jdbcClient = new JDBCClient(map);
+        for (String sql : setupSqls) {
+            jdbcClient.executeSql(sql);
+        }
+
+        this.userSpout = new UserSpout();
+        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+        Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
+        List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+        this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
+
+        if (args.length == 4) {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, getTopology());
+            Thread.sleep(30000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+            System.exit(0);
+        } else {
+            StormSubmitter.submitTopology(args[5], config, getTopology());
+        }
+    }
+
+    public abstract StormTopology getTopology();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 21e4639..26a00aa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -17,62 +17,36 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
-import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.bolt.JdbcBolt;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
-import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
 
-import java.util.Map;
 
-
-public class UserPersistanceTopology {
+public class UserPersistanceTopology extends AbstractUserTopology {
     private static final String USER_SPOUT = "USER_SPOUT";
-    private static final String USER_BOLT = "USER_BOLT";
+    private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+    private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
 
     public static void main(String[] args) throws Exception {
-        if(args.length < 5) {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
-        Map map = Maps.newHashMap();
-        map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
-        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
-        map.put("dataSource.user",args[2]);//root
-        map.put("dataSource.password",args[3]);//password
-        String tableName = args[4];//database table name
-        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
-        Config config = new Config();
-
-        config.put("jdbc.conf", map);
+        new UserPersistanceTopology().execute(args);
+    }
 
-        UserSpout spout = new UserSpout();
-        JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper)
-                .withConfigKey("jdbc.conf");
+    @Override
+    public StormTopology getTopology() {
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+                .withJdbcLookupMapper(this.jdbcLookupMapper)
+                .withSelectSql(SELECT_QUERY);
+        JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+                .withTableName(TABLE_NAME)
+                .withJdbcMapper(this.jdbcMapper);
 
         // userSpout ==> jdbcBolt
         TopologyBuilder builder = new TopologyBuilder();
 
-        builder.setSpout(USER_SPOUT, spout, 1);
-        builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT);
-
-        if (args.length == 5) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, builder.createTopology());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 6) {
-            StormSubmitter.submitTopology(args[6], config, builder.createTopology());
-        } else {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
+        builder.setSpout(USER_SPOUT, this.userSpout, 1);
+        builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
+        builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+        return builder.createTopology();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 3b2ee66..2cf3403 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -17,60 +17,45 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
-import com.google.common.collect.Maps;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import com.google.common.collect.Lists;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
 import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcQuery;
 import org.apache.storm.jdbc.trident.state.JdbcState;
 import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
 import org.apache.storm.jdbc.trident.state.JdbcUpdater;
 import storm.trident.Stream;
+import storm.trident.TridentState;
 import storm.trident.TridentTopology;
 
-import java.util.Map;
+import java.sql.Types;
 
-public class UserPersistanceTridentTopology {
+public class UserPersistanceTridentTopology extends AbstractUserTopology {
 
     public static void main(String[] args) throws Exception {
-        Map map = Maps.newHashMap();
-        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
-        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
-        map.put("dataSource.user",args[2]);//root
-        map.put("dataSource.password",args[3]);//password
-        String tableName = args[4];//database table name
-        JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
-        Config config = new Config();
-
-        config.put("jdbc.conf", map);
+        new UserPersistanceTridentTopology().execute(args);
+    }
 
+    @Override
+    public StormTopology getTopology() {
         TridentTopology topology = new TridentTopology();
-        Stream stream = topology.newStream("userSpout", new UserSpout());
 
         JdbcState.Options options = new JdbcState.Options()
-                .withConfigKey("jdbc.conf")
-                .withMapper(jdbcMapper)
-                .withTableName("user");
+                .withConfigKey(JDBC_CONF)
+                .withMapper(this.jdbcMapper)
+                .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+                .withTableName(TABLE_NAME)
+                .withSelectQuery(SELECT_QUERY);
 
         JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
-        stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"),  new JdbcUpdater(), new Fields());
-        if (args.length == 5) {
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", config, topology.build());
-            Thread.sleep(30000);
-            cluster.killTopology("test");
-            cluster.shutdown();
-            System.exit(0);
-        } else if (args.length == 6) {
-            StormSubmitter.submitTopology(args[6], config, topology.build());
-        } else {
-            System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
-                    "<user> <password> <tableName> [topology name]");
-        }
-    }
 
+        Stream stream = topology.newStream("userSpout", new UserSpout());
+        TridentState state = topology.newStaticState(jdbcStateFactory);
+        stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
+        stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"),  new JdbcUpdater(), new Fields());
+        return topology.build();
+    }
 }