You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:02 UTC
[07/21] storm git commit: STORM-616: adding jdbc Lookup bolt.
STORM-616: adding jdbc Lookup bolt.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bfa6028
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bfa6028
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bfa6028
Branch: refs/heads/master
Commit: 8bfa602876f84985864f136ee578bd2d9edb9ba7
Parents: cd96dd0
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Wed Jan 7 17:57:35 2015 -0500
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Wed Jan 7 18:11:49 2015 -0500
----------------------------------------------------------------------
external/storm-jdbc/README.md | 148 +++++++++++++++----
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 10 +-
.../org/apache/storm/jdbc/bolt/JdbcBolt.java | 18 ++-
.../org/apache/storm/jdbc/common/Column.java | 8 +-
.../apache/storm/jdbc/common/JDBCClient.java | 54 ++++---
.../storm/jdbc/mapper/JdbcLookupMapper.java | 26 ++++
.../jdbc/mapper/SimpleJdbcLookupMapper.java | 46 ++++++
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 14 +-
.../storm/jdbc/trident/state/JdbcQuery.java | 40 +++++
.../storm/jdbc/trident/state/JdbcState.java | 33 +++++
.../org/apache/storm/jdbc/spout/UserSpout.java | 2 +-
.../jdbc/topology/AbstractUserTopology.java | 102 +++++++++++++
.../jdbc/topology/UserPersistanceTopology.java | 64 +++-----
.../UserPersistanceTridentTopology.java | 61 +++-----
14 files changed, 466 insertions(+), 160 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index a0273f2..bb43687 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -1,10 +1,11 @@
-#Storm HBase
+#Storm JDBC
+Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
+to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples
+in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
-Storm/Trident integration for JDBC.
-
-## Usage
-The main API for interacting with JDBC is the `org.apache.storm.jdbc.mapper.TupleToColumnMapper`
-interface:
+## Inserting into a database.
+The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
+The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
```java
public interface JdbcMapper extends Serializable {
@@ -16,7 +17,7 @@ The `getColumns()` method defines how a storm tuple maps to a list of columns re
### SimpleJdbcMapper
`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
-tuple to a Database row. `SimpleJdbcMapper` assumes that the tuple has fields with same name as the column name in
+tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in
the database table that you intend to write to.
To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
@@ -25,8 +26,8 @@ The following code creates a `SimpleJdbcMapper` instance that:
1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
-automatically figure out the column names of the table that you intend to write to.
-Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to lear more about hikari configuration properties.
+automatically figure out the column names and corresponding data types of the table that you intend to write to.
+Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
```java
Map hikariConfigMap = Maps.newHashMap();
@@ -35,49 +36,138 @@ hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
hikariConfigMap.put("dataSource.user","root");
hikariConfigMap.put("dataSource.password","password");
String tableName = "user_details";
-JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
+JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
+```
+The mapper initialized in the example above assumes a storm tuple has value for all the columns.
+If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values
+and you want to only insert values for columns with no default values you can enforce the behavior by initializing the
+`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table
+`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
+In this table the create_time column has a default value. To ensure only the columns with no default values are inserted
+you can initialize the `jdbcMapper` as below:
+
+```java
+List<Column> columnSchema = Lists.newArrayList(
+ new Column("user_id", java.sql.Types.INTEGER),
+ new Column("user_name", java.sql.Types.VARCHAR));
+ JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
```
+
### JdbcBolt
-To use the `JdbcBolt`, construct it with the name of the table to write to, and a `JdbcMapper` implementation. In addition
-you must specify a configuration key that hold the hikari configuration map.
+To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which
+the rows will be inserted.
```java
Config config = new Config();
config.put("jdbc.conf", hikariConfigMap);
-
-JdbcBolt bolt = new JdbcBolt("user_details", jdbcMapper)
- .withConfigKey("jdbc.conf");
+JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
+ .withTableName("user_details")
+ .withJdbcMapper(simpleJdbcMapper);
```
### JdbcTridentState
We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
-state you need to initialize it with the table name, the JdbcMapper instance and hikari configuration. See the example
-below:
+state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
+hikari configuration map. See the example below:
```java
JdbcState.Options options = new JdbcState.Options()
.withConfigKey("jdbc.conf")
.withMapper(jdbcMapper)
- .withTableName("user");
+ .withTableName("user_details");
JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
```
-
-## Example: Persistent User details
-A runnable example can be found in the `src/test/java/topology` directory.
-### Setup
-* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
-* Start the database and login to the database.
-* Create table user using the following query:
+## Lookup from Database
+We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for
+executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
+```java
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+ List<Column> getColumns(ITuple tuple);
+ public List<Values> toTuple(ITuple input, List<Column> columns);
```
-> use test;
-> create table user (id integer, user_name varchar(100), create_date date);
+
+The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm
+tuple.
+The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
+For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
+user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
+The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
+second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
+returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
+query, the second item to second `?` in query and so on.
+The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
+and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
+of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
+
+###SimpleJdbcLookupMapper
+`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`.
+
+To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
+columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
+that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
+SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example
+`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
+select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first,
+and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name.
+So in the example below if the input tuple had fields `user_id, create_date` and the select query was
+`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)`
+will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query.
+For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as
+is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
+
+```java
+Fields outputFields = new Fields("user_id", "user_name", "create_date");
+List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
```
+### JdbcLookupBolt
+To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
+In addition you must specify the `JdbcLookupMapper` and the select query to execute.
+
+```java
+JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
+ .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
+ .withSelectSql("select user_name from user_details where user_id = ?")
+```
+
+### JdbcTridentState for lookup
+We also support a trident query state that can be used with trident topologies.
+
+```java
+JdbcState.Options options = new JdbcState.Options()
+ .withConfigKey("jdbc.conf")
+ .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+ .withSelectQuery("select user_name from user_details where user_id = ?");
+```
+
+## Example:
+A runnable example can be found in the `src/test/java/topology` directory.
+
+### Setup
+* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
+* The test topologies executes the following queries so your intended DB must support these queries for test topologies
+to work.
+```SQL
+create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
+create table if not exists department (dept_id integer, dept_name varchar(100));
+create table if not exists user_department (user_id integer, dept_id integer);
+insert into department values (1, 'R&D');
+insert into department values (2, 'Finance');
+insert into department values (3, 'HR');
+insert into department values (4, 'Sales');
+insert into user_department values (1, 1);
+insert into user_department values (2, 2);
+insert into user_department values (3, 3);
+insert into user_department values (4, 4);
+select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
+```
### Execution
Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
-storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> <tableName> [topology name]
+storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
Mysql Example:
```
@@ -86,7 +176,7 @@ org.apache.storm.jdbc.topology.UserPersistanceTridentTopology com.mysql.jdbc.jd
jdbc:mysql://localhost/test root password user UserPersistenceTopology
```
-You can execute a select query against the user table which shoule show newly inserted rows:
+You can execute a select query against the user table which should show newly inserted rows:
```
select * from user;
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 8dacc2d..1e717eb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -34,15 +34,11 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
protected OutputCollector collector;
protected transient JDBCClient jdbcClient;
- protected String tableName;
- protected JdbcMapper mapper;
protected String configKey;
- public AbstractJdbcBolt(String tableName, JdbcMapper mapper) {
- Validate.notEmpty(tableName, "Table name can not be blank or null");
- Validate.notNull(mapper, "mapper can not be null");
- this.tableName = tableName;
- this.mapper = mapper;
+ public AbstractJdbcBolt(String configKey) {
+ Validate.notEmpty(configKey, "configKey can not be null");
+ this.configKey = configKey;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
index e5df1ae..d4ddfcb 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
@@ -51,21 +51,27 @@ import java.util.List;
public class JdbcBolt extends AbstractJdbcBolt {
private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
- boolean writeToWAL = true;
+ private String tableName;
+ private JdbcMapper jdbcMapper;
- public JdbcBolt(String tableName, JdbcMapper mapper) {
- super(tableName, mapper);
+ public JdbcBolt(String configKey) {
+ super(configKey);
}
- public JdbcBolt withConfigKey(String configKey) {
- this.configKey = configKey;
+ public JdbcBolt withTableName(String tableName) {
+ this.tableName = tableName;
+ return this;
+ }
+
+ public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+ this.jdbcMapper = jdbcMapper;
return this;
}
@Override
public void execute(Tuple tuple) {
try {
- List<Column> columns = mapper.getColumns(tuple);
+ List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
this.jdbcClient.insert(this.tableName, columnLists);
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index 0346bf7..4c5b37d 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -18,13 +18,14 @@
package org.apache.storm.jdbc.common;
+import java.io.Serializable;
import java.lang.reflect.Field;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
-public class Column<T> {
+public class Column<T> implements Serializable {
private String columnName;
private T val;
@@ -36,6 +37,11 @@ public class Column<T> {
this.sqlType = sqlType;
}
+ public Column(String columnName, int sqlType) {
+ this.columnName = columnName;
+ this.sqlType = sqlType;
+ }
+
public String getColumnName() {
return columnName;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
index 5b63d2d..410c884 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java
@@ -81,7 +81,6 @@ public class JDBCClient {
public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
Connection connection = null;
- Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
try {
connection = this.dataSource.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
@@ -95,29 +94,28 @@ public class JDBCClient {
for(int i=1 ; i <= columnCount; i++) {
String columnLabel = metaData.getColumnLabel(i);
int columnType = metaData.getColumnType(i);
- Object val = null;
Class columnJavaType = Util.getJavaType(columnType);
- if (columnJavaType == String.class) {
+ if (columnJavaType.equals(String.class)) {
row.add(new Column<String>(columnLabel, resultSet.getString(columnLabel), columnType));
- } else if (columnJavaType == Integer.class) {
+ } else if (columnJavaType.equals(Integer.class)) {
row.add(new Column<Integer>(columnLabel, resultSet.getInt(columnLabel), columnType));
- } else if (columnJavaType == Double.class) {
+ } else if (columnJavaType.equals(Double.class)) {
row.add(new Column<Double>(columnLabel, resultSet.getDouble(columnLabel), columnType));
- } else if (columnJavaType == Float.class) {
+ } else if (columnJavaType.equals(Float.class)) {
row.add(new Column<Float>(columnLabel, resultSet.getFloat(columnLabel), columnType));
- } else if (columnJavaType == Short.class) {
+ } else if (columnJavaType.equals(Short.class)) {
row.add(new Column<Short>(columnLabel, resultSet.getShort(columnLabel), columnType));
- } else if (columnJavaType == Boolean.class) {
+ } else if (columnJavaType.equals(Boolean.class)) {
row.add(new Column<Boolean>(columnLabel, resultSet.getBoolean(columnLabel), columnType));
- } else if (columnJavaType == byte[].class) {
+ } else if (columnJavaType.equals(byte[].class)) {
row.add(new Column<byte[]>(columnLabel, resultSet.getBytes(columnLabel), columnType));
- } else if (columnJavaType == Long.class) {
+ } else if (columnJavaType.equals(Long.class)) {
row.add(new Column<Long>(columnLabel, resultSet.getLong(columnLabel), columnType));
- } else if (columnJavaType == Date.class) {
+ } else if (columnJavaType.equals(Date.class)) {
row.add(new Column<Date>(columnLabel, resultSet.getDate(columnLabel), columnType));
- } else if (columnJavaType == Time.class) {
+ } else if (columnJavaType.equals(Time.class)) {
row.add(new Column<Time>(columnLabel, resultSet.getTime(columnLabel), columnType));
- } else if (columnJavaType == Timestamp.class) {
+ } else if (columnJavaType.equals(Timestamp.class)) {
row.add(new Column<Timestamp>(columnLabel, resultSet.getTimestamp(columnLabel), columnType));
} else {
throw new RuntimeException("type = " + columnType + " for column " + columnLabel + " not supported.");
@@ -133,17 +131,17 @@ public class JDBCClient {
}
}
- public Map<String, Integer> getColumnSchema(String tableName) {
+ public List<Column> getColumnSchema(String tableName) {
Connection connection = null;
- Map<String, Integer> columnSchemaMap = new HashMap<String, Integer>();
+ List<Column> columns = new ArrayList<Column>();
try {
connection = this.dataSource.getConnection();
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
while (resultSet.next()) {
- columnSchemaMap.put(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE"));
+ columns.add(new Column(resultSet.getString("COLUMN_NAME"), resultSet.getInt("DATA_TYPE")));
}
- return columnSchemaMap;
+ return columns;
} catch (SQLException e) {
throw new RuntimeException("Failed to get schema for table " + tableName, e);
} finally {
@@ -170,27 +168,27 @@ public class JDBCClient {
Class columnJavaType = Util.getJavaType(column.getSqlType());
if (column.getVal() == null) {
preparedStatement.setNull(index, column.getSqlType());
- } else if (columnJavaType == String.class) {
+ } else if (columnJavaType.equals(String.class)) {
preparedStatement.setString(index, (String) column.getVal());
- } else if (columnJavaType == Integer.class) {
+ } else if (columnJavaType.equals(Integer.class)) {
preparedStatement.setInt(index, (Integer) column.getVal());
- } else if (columnJavaType == Double.class) {
+ } else if (columnJavaType.equals(Double.class)) {
preparedStatement.setDouble(index, (Double) column.getVal());
- } else if (columnJavaType == Float.class) {
+ } else if (columnJavaType.equals(Float.class)) {
preparedStatement.setFloat(index, (Float) column.getVal());
- } else if (columnJavaType == Short.class) {
+ } else if (columnJavaType.equals(Short.class)) {
preparedStatement.setShort(index, (Short) column.getVal());
- } else if (columnJavaType == Boolean.class) {
+ } else if (columnJavaType.equals(Boolean.class)) {
preparedStatement.setBoolean(index, (Boolean) column.getVal());
- } else if (columnJavaType == byte[].class) {
+ } else if (columnJavaType.equals(byte[].class)) {
preparedStatement.setBytes(index, (byte[]) column.getVal());
- } else if (columnJavaType == Long.class) {
+ } else if (columnJavaType.equals(Long.class)) {
preparedStatement.setLong(index, (Long) column.getVal());
- } else if (columnJavaType == Date.class) {
+ } else if (columnJavaType.equals(Date.class)) {
preparedStatement.setDate(index, (Date) column.getVal());
- } else if (columnJavaType == Time.class) {
+ } else if (columnJavaType.equals(Time.class)) {
preparedStatement.setTime(index, (Time) column.getVal());
- } else if (columnJavaType == Timestamp.class) {
+ } else if (columnJavaType.equals(Timestamp.class)) {
preparedStatement.setTimestamp(index, (Timestamp) column.getVal());
} else {
throw new RuntimeException("Unknown type of value " + column.getVal() + " for column " + column.getColumnName());
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
new file mode 100644
index 0000000..77852f4
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/JdbcLookupMapper.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.mapper;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.List;
+
+public interface JdbcLookupMapper extends JdbcMapper {
+
+ /**
+ * Covers a DB row to a list of storm values that can be emitted. This is done to allow a single
+ * storm input tuple and a single DB row to result in multiple output values.
+ * @param input the input tuple.
+ * @param columns list of columns that represents a row
+ * @return a List of storm values that can be emitted. Each item in list is emitted as an output tuple.
+ */
+ public List<Values> toTuple(ITuple input, List<Column> columns);
+
+ /**
+ * declare what are the fields that this code will output.
+ * @param declarer
+ */
+ void declareOutputFields(OutputFieldsDeclarer declarer);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
new file mode 100644
index 0000000..e2a7e8c
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcLookupMapper.java
@@ -0,0 +1,46 @@
+package org.apache.storm.jdbc.mapper;
+
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
+import backtype.storm.tuple.Values;
+import org.apache.storm.jdbc.common.Column;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SimpleJdbcLookupMapper extends SimpleJdbcMapper implements JdbcLookupMapper {
+
+ private Fields outputFields;
+
+ public SimpleJdbcLookupMapper(Fields outputFields, List<Column> queryColumns) {
+ super(queryColumns);
+ this.outputFields = outputFields;
+ }
+
+ @Override
+ public List<Values> toTuple(ITuple input, List<Column> columns) {
+ Values values = new Values();
+
+ for(String field : outputFields) {
+ if(input.contains(field)) {
+ values.add(input.getValueByField(field));
+ } else {
+ for(Column column : columns) {
+ if(column.getColumnName().equals(field)) {
+ values.add(column.getVal());
+ }
+ }
+ }
+ }
+ List<Values> result = new ArrayList<Values>();
+ result.add(values);
+ return result;
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(outputFields);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index 7011a72..df25695 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -31,19 +31,23 @@ import java.util.Map;
public class SimpleJdbcMapper implements JdbcMapper {
- private Map<String, Integer> columnNameToType;
+ private List<Column> schemaColumns;
public SimpleJdbcMapper(String tableName, Map map) {
JDBCClient client = new JDBCClient(map);
- this.columnNameToType = client.getColumnSchema(tableName);
+ this.schemaColumns = client.getColumnSchema(tableName);
+ }
+
+ public SimpleJdbcMapper(List<Column> schemaColumns) {
+ this.schemaColumns = schemaColumns;
}
@Override
public List<Column> getColumns(ITuple tuple) {
List<Column> columns = new ArrayList<Column>();
- for(Map.Entry<String, Integer> entry: columnNameToType.entrySet()) {
- String columnName = entry.getKey();
- Integer columnSqlType = entry.getValue();
+ for(Column column : schemaColumns) {
+ String columnName = column.getColumnName();
+ Integer columnSqlType = column.getSqlType();
if(Util.getJavaType(columnSqlType).equals(String.class)) {
String value = tuple.getStringByField(columnName);
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
new file mode 100644
index 0000000..ad39f4b
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcQuery.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.trident.state;
+
+import backtype.storm.tuple.Values;
+import storm.trident.operation.TridentCollector;
+import storm.trident.state.BaseQueryFunction;
+import storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class JdbcQuery extends BaseQueryFunction<JdbcState, List<Values>> {
+
+ @Override
+ public List<List<Values>> batchRetrieve(JdbcState jdbcState, List<TridentTuple> tridentTuples) {
+ return jdbcState.batchRetrieve(tridentTuples);
+ }
+
+ @Override
+ public void execute(TridentTuple tuples, List<Values> values, TridentCollector tridentCollector) {
+ for (Values value : values) {
+ tridentCollector.emit(value);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index fec2ee4..6b4e79a 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -18,10 +18,13 @@
package org.apache.storm.jdbc.trident.state;
import backtype.storm.topology.FailedException;
+import backtype.storm.tuple.Values;
+import com.google.common.collect.Lists;
import org.apache.commons.lang.Validate;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.JDBCClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.operation.TridentCollector;
@@ -48,8 +51,10 @@ public class JdbcState implements State {
public static class Options implements Serializable {
private JdbcMapper mapper;
+ private JdbcLookupMapper jdbcLookupMapper;
private String configKey;
private String tableName;
+ private String selectQuery;
public Options withConfigKey(String configKey) {
this.configKey = configKey;
@@ -65,6 +70,16 @@ public class JdbcState implements State {
this.mapper = mapper;
return this;
}
+
+ public Options withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
+ this.jdbcLookupMapper = jdbcLookupMapper;
+ return this;
+ }
+
+ public Options withSelectQuery(String selectQuery) {
+ this.selectQuery = selectQuery;
+ return this;
+ }
}
protected void prepare() {
@@ -98,4 +113,22 @@ public class JdbcState implements State {
throw new FailedException(e);
}
}
+
+ public List<List<Values>> batchRetrieve(List<TridentTuple> tridentTuples) {
+ List<List<Values>> batchRetrieveResult = Lists.newArrayList();
+ try {
+ for (TridentTuple tuple : tridentTuples) {
+ List<Column> columns = options.jdbcLookupMapper.getColumns(tuple);
+ List<List<Column>> rows = jdbcClient.select(options.selectQuery, columns);
+ for(List<Column> row : rows) {
+ List<Values> values = options.jdbcLookupMapper.toTuple(tuple, row);
+ batchRetrieveResult.add(values);
+ }
+ }
+ } catch (Exception e) {
+ LOG.warn("Batch get operation failed. Triggering replay.", e);
+ throw new FailedException(e);
+ }
+ return batchRetrieveResult;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
index 39fde59..718917a 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -72,7 +72,7 @@ public class UserSpout implements IRichSpout {
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("id","user_name","create_date"));
+ declarer.declare(new Fields("user_id","user_name","create_date"));
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
new file mode 100644
index 0000000..700f83e
--- /dev/null
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.topology;
+
+import backtype.storm.Config;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
+import backtype.storm.tuple.Fields;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.common.JDBCClient;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
+import org.apache.storm.jdbc.spout.UserSpout;
+import backtype.storm.LocalCluster;
+
+import java.sql.Types;
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractUserTopology {
+ private static final List<String> setupSqls = Lists.newArrayList(
+ "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
+ "create table if not exists department (dept_id integer, dept_name varchar(100))",
+ "create table if not exists user_department (user_id integer, dept_id integer)",
+ "insert into department values (1, 'R&D')",
+ "insert into department values (2, 'Finance')",
+ "insert into department values (3, 'HR')",
+ "insert into department values (4, 'Sales')",
+ "insert into user_department values (1, 1)",
+ "insert into user_department values (2, 2)",
+ "insert into user_department values (3, 3)",
+ "insert into user_department values (4, 4)"
+ );
+ protected UserSpout userSpout;
+ protected JdbcMapper jdbcMapper;
+ protected JdbcLookupMapper jdbcLookupMapper;
+
+ protected static final String TABLE_NAME = "user";
+ protected static final String JDBC_CONF = "jdbc.conf";
+ protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
+ " and user_department.user_id = ?";
+
+ public void execute(String[] args) throws Exception {
+ if (args.length != 4 && args.length != 5) {
+ System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
+ + "<user> <password> [topology name]");
+ System.exit(-1);
+ }
+ Map map = Maps.newHashMap();
+ map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
+ map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
+ map.put("dataSource.user", args[2]);//root
+ map.put("dataSource.password", args[3]);//password
+
+ Config config = new Config();
+ config.put(JDBC_CONF, map);
+
+ JDBCClient jdbcClient = new JDBCClient(map);
+ for (String sql : setupSqls) {
+ jdbcClient.executeSql(sql);
+ }
+
+ this.userSpout = new UserSpout();
+ this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
+ Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
+ List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
+ this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
+
+ if (args.length == 4) {
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", config, getTopology());
+ Thread.sleep(30000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ System.exit(0);
+ } else {
+ StormSubmitter.submitTopology(args[5], config, getTopology());
+ }
+ }
+
+ public abstract StormTopology getTopology();
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 21e4639..26a00aa 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -17,62 +17,36 @@
*/
package org.apache.storm.jdbc.topology;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
-import com.google.common.collect.Maps;
import org.apache.storm.jdbc.bolt.JdbcBolt;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
-import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
-import java.util.Map;
-
-public class UserPersistanceTopology {
+public class UserPersistanceTopology extends AbstractUserTopology {
private static final String USER_SPOUT = "USER_SPOUT";
- private static final String USER_BOLT = "USER_BOLT";
+ private static final String LOOKUP_BOLT = "LOOKUP_BOLT";
+ private static final String PERSISTANCE_BOLT = "PERSISTANCE_BOLT";
public static void main(String[] args) throws Exception {
- if(args.length < 5) {
- System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
- "<user> <password> <tableName> [topology name]");
- }
- Map map = Maps.newHashMap();
- map.put("dataSourceClassName",args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
- map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
- map.put("dataSource.user",args[2]);//root
- map.put("dataSource.password",args[3]);//password
- String tableName = args[4];//database table name
- JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
- Config config = new Config();
-
- config.put("jdbc.conf", map);
+ new UserPersistanceTopology().execute(args);
+ }
- UserSpout spout = new UserSpout();
- JdbcBolt bolt = new JdbcBolt(tableName, jdbcMapper)
- .withConfigKey("jdbc.conf");
+ @Override
+ public StormTopology getTopology() {
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
+ .withJdbcLookupMapper(this.jdbcLookupMapper)
+ .withSelectSql(SELECT_QUERY);
+ JdbcBolt userPersistanceBolt = new JdbcBolt(JDBC_CONF)
+ .withTableName(TABLE_NAME)
+ .withJdbcMapper(this.jdbcMapper);
// userSpout ==> jdbcBolt
TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout(USER_SPOUT, spout, 1);
- builder.setBolt(USER_BOLT, bolt, 1).shuffleGrouping(USER_SPOUT);
-
- if (args.length == 5) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, builder.createTopology());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else if (args.length == 6) {
- StormSubmitter.submitTopology(args[6], config, builder.createTopology());
- } else {
- System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
- "<user> <password> <tableName> [topology name]");
- }
+ builder.setSpout(USER_SPOUT, this.userSpout, 1);
+ builder.setBolt(LOOKUP_BOLT, departmentLookupBolt, 1).shuffleGrouping(USER_SPOUT);
+ builder.setBolt(PERSISTANCE_BOLT, userPersistanceBolt, 1).shuffleGrouping(LOOKUP_BOLT);
+ return builder.createTopology();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bfa6028/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 3b2ee66..2cf3403 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -17,60 +17,45 @@
*/
package org.apache.storm.jdbc.topology;
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
+import backtype.storm.generated.StormTopology;
import backtype.storm.tuple.Fields;
-import com.google.common.collect.Maps;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
+import com.google.common.collect.Lists;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.spout.UserSpout;
+import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.jdbc.trident.state.JdbcUpdater;
import storm.trident.Stream;
+import storm.trident.TridentState;
import storm.trident.TridentTopology;
-import java.util.Map;
+import java.sql.Types;
-public class UserPersistanceTridentTopology {
+public class UserPersistanceTridentTopology extends AbstractUserTopology {
public static void main(String[] args) throws Exception {
- Map map = Maps.newHashMap();
- map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
- map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
- map.put("dataSource.user",args[2]);//root
- map.put("dataSource.password",args[3]);//password
- String tableName = args[4];//database table name
- JdbcMapper jdbcMapper = new SimpleJdbcMapper(tableName, map);
-
- Config config = new Config();
-
- config.put("jdbc.conf", map);
+ new UserPersistanceTridentTopology().execute(args);
+ }
+ @Override
+ public StormTopology getTopology() {
TridentTopology topology = new TridentTopology();
- Stream stream = topology.newStream("userSpout", new UserSpout());
JdbcState.Options options = new JdbcState.Options()
- .withConfigKey("jdbc.conf")
- .withMapper(jdbcMapper)
- .withTableName("user");
+ .withConfigKey(JDBC_CONF)
+ .withMapper(this.jdbcMapper)
+ .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
+ .withTableName(TABLE_NAME)
+ .withSelectQuery(SELECT_QUERY);
JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
- stream.partitionPersist(jdbcStateFactory, new Fields("id","user_name","create_date"), new JdbcUpdater(), new Fields());
- if (args.length == 5) {
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("test", config, topology.build());
- Thread.sleep(30000);
- cluster.killTopology("test");
- cluster.shutdown();
- System.exit(0);
- } else if (args.length == 6) {
- StormSubmitter.submitTopology(args[6], config, topology.build());
- } else {
- System.out.println("Usage: UserPersistanceTopology <dataSourceClassName> <dataSource.url> " +
- "<user> <password> <tableName> [topology name]");
- }
- }
+ Stream stream = topology.newStream("userSpout", new UserSpout());
+ TridentState state = topology.newStaticState(jdbcStateFactory);
+ stream = stream.stateQuery(state, new Fields("user_id","user_name","create_date"), new JdbcQuery(), new Fields("dept_name"));
+ stream.partitionPersist(jdbcStateFactory, new Fields("user_id","user_name","dept_name","create_date"), new JdbcUpdater(), new Fields());
+ return topology.build();
+ }
}