You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 21:49:40 UTC
[3/9] storm git commit: STORM-821: Renaming the connection provider
to fix the type. Allowing column val to be null.
STORM-821: Renaming the connection provider to fix the type. Allowing column val to be null.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d268903f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d268903f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d268903f
Branch: refs/heads/0.10.x-branch
Commit: d268903f026cf275b72e6246b61c769d28fb3ed1
Parents: f628bb4
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue May 26 15:46:40 2015 -0700
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue May 26 15:46:40 2015 -0700
----------------------------------------------------------------------
.../storm/jdbc/bolt/AbstractJdbcBolt.java | 15 ++++++-----
.../apache/storm/jdbc/bolt/JdbcInsertBolt.java | 6 ++---
.../apache/storm/jdbc/bolt/JdbcLookupBolt.java | 6 ++---
.../org/apache/storm/jdbc/common/Column.java | 7 +++---
.../storm/jdbc/common/ConnectionProvider.java | 26 ++++++++++++++++++++
.../storm/jdbc/common/ConnectionPrvoider.java | 26 --------------------
.../jdbc/common/HikariCPConnectionProvider.java | 2 +-
.../apache/storm/jdbc/common/JdbcClient.java | 14 +++++------
.../storm/jdbc/mapper/SimpleJdbcMapper.java | 8 +++---
.../storm/jdbc/trident/state/JdbcState.java | 13 +++++-----
.../storm/jdbc/common/JdbcClientTest.java | 7 +++---
.../jdbc/topology/AbstractUserTopology.java | 16 ++++++------
.../jdbc/topology/UserPersistanceTopology.java | 4 +--
.../UserPersistanceTridentTopology.java | 2 +-
14 files changed, 74 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index d1f5758..15a2345 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -21,8 +21,7 @@ import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
-import org.apache.commons.lang.Validate;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,27 +36,27 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
protected transient JdbcClient jdbcClient;
protected String configKey;
protected Integer queryTimeoutSecs;
- protected ConnectionPrvoider connectionPrvoider;
+ protected ConnectionProvider connectionProvider;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.collector = collector;
- connectionPrvoider.prepare();
+ connectionProvider.prepare();
if(queryTimeoutSecs == null) {
queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
}
- this.jdbcClient = new JdbcClient(connectionPrvoider, queryTimeoutSecs);
+ this.jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
}
- public AbstractJdbcBolt(ConnectionPrvoider connectionPrvoider) {
- this.connectionPrvoider = connectionPrvoider;
+ public AbstractJdbcBolt(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
}
@Override
public void cleanup() {
- connectionPrvoider.cleanup();
+ connectionProvider.cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
index 0dc7f26..2f29000 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -22,7 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +43,8 @@ public class JdbcInsertBolt extends AbstractJdbcBolt {
private String insertQuery;
private JdbcMapper jdbcMapper;
- public JdbcInsertBolt(ConnectionPrvoider connectionPrvoider, JdbcMapper jdbcMapper) {
- super(connectionPrvoider);
+ public JdbcInsertBolt(ConnectionProvider connectionProvider, JdbcMapper jdbcMapper) {
+ super(connectionProvider);
this.jdbcMapper = jdbcMapper;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 903a8f8..25122e2 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -21,7 +21,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +38,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
private JdbcLookupMapper jdbcLookupMapper;
- public JdbcLookupBolt(ConnectionPrvoider connectionPrvoider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
- super(connectionPrvoider);
+ public JdbcLookupBolt(ConnectionProvider connectionProvider, String selectQuery, JdbcLookupMapper jdbcLookupMapper) {
+ super(connectionProvider);
this.selectQuery = selectQuery;
this.jdbcLookupMapper = jdbcLookupMapper;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index c531fff..28c5e51 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -84,19 +84,18 @@ public class Column<T> implements Serializable {
if (this == o) return true;
if (!(o instanceof Column)) return false;
- Column column = (Column) o;
+ Column<?> column = (Column<?>) o;
if (sqlType != column.sqlType) return false;
if (!columnName.equals(column.columnName)) return false;
- if (!val.equals(column.val)) return false;
+ return !(val != null ? !val.equals(column.val) : column.val != null);
- return true;
}
@Override
public int hashCode() {
int result = columnName.hashCode();
- result = 31 * result + val.hashCode();
+ result = 31 * result + (val != null ? val.hashCode() : 0);
result = 31 * result + sqlType;
return result;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
new file mode 100644
index 0000000..b838e48
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionProvider.java
@@ -0,0 +1,26 @@
+package org.apache.storm.jdbc.common;
+
+import java.io.Serializable;
+import java.sql.Connection;
+import java.util.Map;
+
+/**
+ * Provides a database connection.
+ */
+public interface ConnectionProvider extends Serializable {
+ /**
+ * method must be idempotent.
+ */
+ void prepare();
+
+ /**
+ *
+ * @return a DB connection over which the queries can be executed.
+ */
+ Connection getConnection();
+
+ /**
+ * called once when the system is shutting down, should be idempotent.
+ */
+ void cleanup();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionPrvoider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionPrvoider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionPrvoider.java
deleted file mode 100644
index 7b07edf..0000000
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/ConnectionPrvoider.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.apache.storm.jdbc.common;
-
-import java.io.Serializable;
-import java.sql.Connection;
-import java.util.Map;
-
-/**
- * Provides a database connection.
- */
-public interface ConnectionPrvoider extends Serializable {
- /**
- * method must be idempotent.
- */
- void prepare();
-
- /**
- *
- * @return a DB connection over which the queries can be executed.
- */
- Connection getConnection();
-
- /**
- * called once when the system is shutting down, should be idempotent.
- */
- void cleanup();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
index 5ed77aa..b523fcc 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/HikariCPConnectionProvider.java
@@ -8,7 +8,7 @@ import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;
-public class HikariCPConnectionProvider implements ConnectionPrvoider {
+public class HikariCPConnectionProvider implements ConnectionProvider {
private Map<String, Object> configMap;
private transient HikariDataSource dataSource;
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index e9261a7..228babe 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -32,11 +32,11 @@ import java.util.*;
public class JdbcClient {
private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
- private ConnectionPrvoider connectionPrvoider;
+ private ConnectionProvider connectionProvider;
private int queryTimeoutSecs;
- public JdbcClient(ConnectionPrvoider connectionPrvoider, int queryTimeoutSecs) {
- this.connectionPrvoider = connectionPrvoider;
+ public JdbcClient(ConnectionProvider connectionProvider, int queryTimeoutSecs) {
+ this.connectionProvider = connectionProvider;
this.queryTimeoutSecs = queryTimeoutSecs;
}
@@ -48,7 +48,7 @@ public class JdbcClient {
public void executeInsertQuery(String query, List<List<Column>> columnLists) {
Connection connection = null;
try {
- connection = connectionPrvoider.getConnection();
+ connection = connectionProvider.getConnection();
boolean autoCommit = connection.getAutoCommit();
if(autoCommit) {
connection.setAutoCommit(false);
@@ -105,7 +105,7 @@ public class JdbcClient {
public List<List<Column>> select(String sqlQuery, List<Column> queryParams) {
Connection connection = null;
try {
- connection = connectionPrvoider.getConnection();
+ connection = connectionProvider.getConnection();
PreparedStatement preparedStatement = connection.prepareStatement(sqlQuery);
if(queryTimeoutSecs > 0) {
preparedStatement.setQueryTimeout(queryTimeoutSecs);
@@ -161,7 +161,7 @@ public class JdbcClient {
Connection connection = null;
List<Column> columns = new ArrayList<Column>();
try {
- connection = connectionPrvoider.getConnection();
+ connection = connectionProvider.getConnection();
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getColumns(null, null, tableName, null);
while (resultSet.next()) {
@@ -178,7 +178,7 @@ public class JdbcClient {
public void executeSql(String sql) {
Connection connection = null;
try {
- connection = connectionPrvoider.getConnection();
+ connection = connectionProvider.getConnection();
Statement statement = connection.createStatement();
statement.execute(sql);
} catch (SQLException e) {
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
index d17b99a..1ec5c74 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/mapper/SimpleJdbcMapper.java
@@ -19,7 +19,7 @@ package org.apache.storm.jdbc.mapper;
import backtype.storm.tuple.ITuple;
import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.common.Util;
@@ -34,10 +34,10 @@ public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;
- public SimpleJdbcMapper(String tableName, Map<String, Object> connectionProvideConfig, ConnectionPrvoider connectionPrvoider) {
+ public SimpleJdbcMapper(String tableName, Map<String, Object> connectionProvideConfig, ConnectionProvider connectionProvider) {
int queryTimeoutSecs = 30;
- connectionPrvoider.prepare();
- JdbcClient client = new JdbcClient(connectionPrvoider, queryTimeoutSecs);
+ connectionProvider.prepare();
+ JdbcClient client = new JdbcClient(connectionProvider, queryTimeoutSecs);
this.schemaColumns = client.getColumnSchema(tableName);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
index dce9231..8afc466 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcState.java
@@ -21,10 +21,9 @@ import backtype.storm.Config;
import backtype.storm.topology.FailedException;
import backtype.storm.tuple.Values;
import com.google.common.collect.Lists;
-import org.apache.commons.lang.Validate;
import org.apache.commons.lang3.StringUtils;
import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
@@ -55,14 +54,14 @@ public class JdbcState implements State {
public static class Options implements Serializable {
private JdbcMapper mapper;
private JdbcLookupMapper jdbcLookupMapper;
- private ConnectionPrvoider connectionPrvoider;
+ private ConnectionProvider connectionProvider;
private String tableName;
private String insertQuery;
private String selectQuery;
private Integer queryTimeoutSecs;
- public Options withConnectionPrvoider(ConnectionPrvoider connectionPrvoider) {
- this.connectionPrvoider = connectionPrvoider;
+ public Options withConnectionPrvoider(ConnectionProvider connectionProvider) {
+ this.connectionProvider = connectionProvider;
return this;
}
@@ -98,7 +97,7 @@ public class JdbcState implements State {
}
protected void prepare() {
- options.connectionPrvoider.prepare();
+ options.connectionProvider.prepare();
if(StringUtils.isBlank(options.insertQuery) && StringUtils.isBlank(options.tableName) && StringUtils.isBlank(options.selectQuery)) {
throw new IllegalArgumentException("If you are trying to insert into DB you must supply either insertQuery or tableName." +
@@ -109,7 +108,7 @@ public class JdbcState implements State {
options.queryTimeoutSecs = Integer.parseInt(map.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS).toString());
}
- this.jdbcClient = new JdbcClient(options.connectionPrvoider, options.queryTimeoutSecs);
+ this.jdbcClient = new JdbcClient(options.connectionProvider, options.queryTimeoutSecs);
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
index f265332..551cd72 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/common/JdbcClientTest.java
@@ -25,7 +25,6 @@ import org.junit.Before;
import org.junit.Test;
import java.sql.Timestamp;
-import java.util.Date;
import java.sql.Types;
import java.util.ArrayList;
import java.util.List;
@@ -43,11 +42,11 @@ public class JdbcClientTest {
map.put("dataSource.url", "jdbc:hsqldb:mem:test");//jdbc:mysql://localhost/test
map.put("dataSource.user","SA");//root
map.put("dataSource.password","");//password
- ConnectionPrvoider connectionPrvoider = new HikariCPConnectionProvider(map);
- connectionPrvoider.prepare();
+ ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+ connectionProvider.prepare();
int queryTimeoutSecs = 60;
- this.client = new JdbcClient(connectionPrvoider, queryTimeoutSecs);
+ this.client = new JdbcClient(connectionProvider, queryTimeoutSecs);
client.executeSql("create table user_details (id integer, user_name varchar(100), created_timestamp TIMESTAMP)");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 96bafe1..653f3d9 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -24,7 +24,7 @@ import backtype.storm.tuple.Fields;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.common.ConnectionPrvoider;
+import org.apache.storm.jdbc.common.ConnectionProvider;
import org.apache.storm.jdbc.common.HikariCPConnectionProvider;
import org.apache.storm.jdbc.common.JdbcClient;
import org.apache.storm.jdbc.mapper.JdbcMapper;
@@ -58,7 +58,7 @@ public abstract class AbstractUserTopology {
protected UserSpout userSpout;
protected JdbcMapper jdbcMapper;
protected JdbcLookupMapper jdbcLookupMapper;
- protected ConnectionPrvoider connectionPrvoider;
+ protected ConnectionProvider connectionProvider;
protected static final String TABLE_NAME = "user";
protected static final String JDBC_CONF = "jdbc.conf";
@@ -83,21 +83,21 @@ public abstract class AbstractUserTopology {
Config config = new Config();
config.put(JDBC_CONF, map);
- ConnectionPrvoider connectionPrvoider = new HikariCPConnectionProvider(map);
- connectionPrvoider.prepare();
+ ConnectionProvider connectionProvider = new HikariCPConnectionProvider(map);
+ connectionProvider.prepare();
int queryTimeoutSecs = 60;
- JdbcClient jdbcClient = new JdbcClient(connectionPrvoider, queryTimeoutSecs);
+ JdbcClient jdbcClient = new JdbcClient(connectionProvider, queryTimeoutSecs);
for (String sql : setupSqls) {
jdbcClient.executeSql(sql);
}
this.userSpout = new UserSpout();
- this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map, connectionPrvoider);
- connectionPrvoider.cleanup();
+ this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map, connectionProvider);
+ connectionProvider.cleanup();
Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
- this.connectionPrvoider = new HikariCPConnectionProvider(map);
+ this.connectionProvider = new HikariCPConnectionProvider(map);
if (args.length == 4) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, getTopology());
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index a4498ca..585994e 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -41,14 +41,14 @@ public class UserPersistanceTopology extends AbstractUserTopology {
@Override
public StormTopology getTopology() {
- JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionPrvoider, SELECT_QUERY, this.jdbcLookupMapper);
+ JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(connectionProvider, SELECT_QUERY, this.jdbcLookupMapper);
//must specify column schema when providing custom query.
List<Column> schemaColumns = Lists.newArrayList(new Column("create_date", Types.DATE),
new Column("dept_name", Types.VARCHAR), new Column("user_id", Types.INTEGER), new Column("user_name", Types.VARCHAR));
JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
- JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionPrvoider, mapper)
+ JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, mapper)
.withInsertQuery("insert into user (create_date, dept_name, user_id, user_name) values (?,?,?,?)");
// userSpout ==> jdbcBolt
http://git-wip-us.apache.org/repos/asf/storm/blob/d268903f/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 6512441..7cf0ce6 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -44,7 +44,7 @@ public class UserPersistanceTridentTopology extends AbstractUserTopology {
TridentTopology topology = new TridentTopology();
JdbcState.Options options = new JdbcState.Options()
- .withConnectionPrvoider(connectionPrvoider)
+ .withConnectionPrvoider(connectionProvider)
.withMapper(this.jdbcMapper)
.withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("dept_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
.withTableName(TABLE_NAME)