You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/02/23 23:22:11 UTC

[16/21] storm git commit: STORM-616: renaming JDBCBolt to JDBCInsertBolt. Added Javadoc to Column.

STORM-616: renaming JDBCBolt to JDBCInsertBolt. Added Javadoc to Column.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e157edb2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e157edb2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e157edb2

Branch: refs/heads/master
Commit: e157edb2697eeb32cf371d6556e450c343747407
Parents: bb927c8
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Tue Feb 3 15:21:35 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Feb 3 15:21:35 2015 -0800

----------------------------------------------------------------------
 external/storm-jdbc/README.md                   | 43 +++++++++--
 .../storm/jdbc/bolt/AbstractJdbcBolt.java       |  4 +
 .../org/apache/storm/jdbc/bolt/JdbcBolt.java    | 81 --------------------
 .../apache/storm/jdbc/bolt/JdbcInsertBolt.java  | 79 +++++++++++++++++++
 .../apache/storm/jdbc/bolt/JdbcLookupBolt.java  | 10 +--
 .../org/apache/storm/jdbc/common/Column.java    | 34 ++++++--
 .../apache/storm/jdbc/common/JdbcClient.java    |  6 +-
 .../jdbc/topology/UserPersistanceTopology.java  |  8 +-
 8 files changed, 157 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/README.md
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/README.md b/external/storm-jdbc/README.md
index 190fb98..cfe449d 100644
--- a/external/storm-jdbc/README.md
+++ b/external/storm-jdbc/README.md
@@ -53,8 +53,8 @@ List<Column> columnSchema = Lists.newArrayList(
     JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
 ```
 
-### JdbcBolt
-To use the `JdbcBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
+### JdbcInsertBolt
+To use the `JdbcInsertBolt`, you construct an instance of it and specify a configuration key in your storm config that hold the 
 hikari configuration map. In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and 
 the table name in which the rows will be inserted. You can optionally specify a query timeout seconds param that specifies 
 max seconds an insert query can take. The default is set to value of topology.message.timeout.secs.You should set this value 
@@ -63,8 +63,7 @@ to be <= topology.message.timeout.secs.
  ```java
 Config config = new Config();
 config.put("jdbc.conf", hikariConfigMap);
-JdbcBolt userPersistanceBolt = new JdbcBolt()
-                                    .withConfigKey("jdbc.conf")
+JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt("jdbc.conf")
                                     .withTableName("user_details")
                                     .withJdbcMapper(simpleJdbcMapper)
                                     .withQueryTimeoutSecs(30);
@@ -178,11 +177,39 @@ select dept_name from department, user_department where department.dept_id = use
 Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
 storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
 
+To make it work with Mysql, you can add the following to the pom.xml
+
+```
+<dependency>
+    <groupId>mysql</groupId>
+    <artifactId>mysql-connector-java</artifactId>
+    <version>5.1.31</version>
+</dependency>
+```
+
+You can generate a single jar with dependencies using mvn assembly plugin. To use the plugin add the following to your pom.xml and execute 
+mvn clean compile assembly:single.
+
+```
+<plugin>
+    <artifactId>maven-assembly-plugin</artifactId>
+    <configuration>
+        <archive>
+            <manifest>
+                <mainClass>fully.qualified.MainClass</mainClass>
+            </manifest>
+        </archive>
+        <descriptorRefs>
+            <descriptorRef>jar-with-dependencies</descriptorRef>
+        </descriptorRefs>
+    </configuration>
+</plugin>
+```
+
 Mysql Example:
 ```
 storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
-org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
-jdbc:mysql://localhost/test root password UserPersistenceTopology
+org.apache.storm.jdbc.topology.UserPersistanceTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource jdbc:mysql://localhost/test root password UserPersistenceTopology
 ```
 
 You can execute a select query against the user table which should show newly inserted rows:
@@ -212,5 +239,5 @@ specific language governing permissions and limitations
 under the License.
 
 ## Committer Sponsors
-* Parth Brahmbhatt ([brahmbhatt.parth@gmail.com](mailto:brahmbhatt.parth@gmail.com))
-* Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) 
\ No newline at end of file
+ * P. Taylor Goetz ([ptgoetz@apache.org](mailto:ptgoetz@apache.org))
+ * Sriharsha Chintalapani ([sriharsha@apache.org](mailto:sriharsha@apache.org)) 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
index 436ad00..0d30529 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java
@@ -50,4 +50,8 @@ public abstract class AbstractJdbcBolt extends BaseRichBolt {
 
         this.jdbcClient = new JdbcClient(conf, queryTimeoutSecs);
     }
+
+    public AbstractJdbcBolt(String configKey) {
+        this.configKey = configKey;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
deleted file mode 100644
index f4921f5..0000000
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.jdbc.bolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import org.apache.storm.jdbc.common.Column;
-import org.apache.storm.jdbc.mapper.JdbcMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Basic bolt for writing to any Database table.
- * <p/>
- * Note: Each JdbcBolt defined in a topology is tied to a specific table.
- */
-public class JdbcBolt extends AbstractJdbcBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
-
-    private String tableName;
-    private JdbcMapper jdbcMapper;
-
-    public JdbcBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
-        return this;
-    }
-
-    public JdbcBolt withTableName(String tableName) {
-        this.tableName = tableName;
-        return this;
-    }
-
-    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
-        this.jdbcMapper = jdbcMapper;
-        return this;
-    }
-
-    public JdbcBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
-        this.queryTimeoutSecs = queryTimeoutSecs;
-        return this;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        try {
-            List<Column> columns = jdbcMapper.getColumns(tuple);
-            List<List<Column>> columnLists = new ArrayList<List<Column>>();
-            columnLists.add(columns);
-            this.jdbcClient.insert(this.tableName, columnLists);
-        } catch (Exception e) {
-            LOG.warn("Failing tuple.", e);
-            this.collector.fail(tuple);
-            this.collector.reportError(e);
-            return;
-        }
-
-        this.collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
new file mode 100644
index 0000000..9abd553
--- /dev/null
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.jdbc.bolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import org.apache.storm.jdbc.common.Column;
+import org.apache.storm.jdbc.mapper.JdbcMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Basic bolt for writing to any Database table.
+ * <p/>
+ * Note: Each JdbcInsertBolt defined in a topology is tied to a specific table.
+ */
+public class JdbcInsertBolt extends AbstractJdbcBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(JdbcInsertBolt.class);
+
+    private String tableName;
+    private JdbcMapper jdbcMapper;
+
+    public JdbcInsertBolt(String configKey) {
+        super(configKey);
+    }
+
+    public JdbcInsertBolt withTableName(String tableName) {
+        this.tableName = tableName;
+        return this;
+    }
+
+    public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
+        this.jdbcMapper = jdbcMapper;
+        return this;
+    }
+
+    public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
+        this.queryTimeoutSecs = queryTimeoutSecs;
+        return this;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+        try {
+            List<Column> columns = jdbcMapper.getColumns(tuple);
+            List<List<Column>> columnLists = new ArrayList<List<Column>>();
+            columnLists.add(columns);
+            this.jdbcClient.insert(this.tableName, columnLists);
+        } catch (Exception e) {
+            this.collector.reportError(e);
+            this.collector.fail(tuple);
+            return;
+        }
+
+        this.collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
index 041fbe8..8232c2f 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java
@@ -37,9 +37,8 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
 
     private JdbcLookupMapper jdbcLookupMapper;
 
-    public JdbcLookupBolt withConfigKey(String configKey) {
-        this.configKey = configKey;
-        return this;
+    public JdbcLookupBolt(String configKey) {
+        super(configKey);
     }
 
     public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
@@ -67,15 +66,14 @@ public class JdbcLookupBolt extends AbstractJdbcBolt {
                 for (List<Column> row : result) {
                     List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
                     for (Values value : values) {
-                        collector.emit(value);
+                        collector.emit(tuple, value);
                     }
                 }
             }
             this.collector.ack(tuple);
         } catch (Exception e) {
-            LOG.warn("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
-            this.collector.fail(tuple);
             this.collector.reportError(e);
+            this.collector.fail(tuple);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
index 4c5b37d..c462c6e 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java
@@ -19,16 +19,40 @@ package org.apache.storm.jdbc.common;
 
 
 import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.sql.Date;
-import java.sql.Time;
-import java.sql.Timestamp;
-import java.sql.Types;
 
+/**
+ * A database table can be defined as a list of rows and each row can be defined as a list of columns where
+ * each column instance has a name, a value and a type. This class represents an instance of a column in a database
+ * row. For example if we have the following table named user:
+ * <pre>
+ *  ____________________________
+ * |    UserId  |   UserName    |
+ * |      1     |    Foo        |
+ * |      2     |    Bar        |
+ *  ----------------------------
+ * </pre>
+ *
+ * The following class can be used to represent the data in the table as
+ * <pre>
+ * List<List<Column>> rows = new ArrayList<List<Column>>();
+ * List<Column> row1 = Lists.newArrayList(new Column("UserId", 1, Types.INTEGER), new Column("UserName", "Foo", Types.VARCHAR))
+ * List<Column> row1 = Lists.newArrayList(new Column("UserId", 2, Types.INTEGER), new Column("UserName", "Bar", Types.VARCHAR))
+ *
+ * rows.add(row1)
+ * rows.add(row2)
+ *
+ * </pre>
+ * @param <T>
+ */
 public class Column<T> implements Serializable {
 
     private String columnName;
     private T val;
+
+    /**
+     * The sql type(e.g. varchar, date, int) Idealy we would have an enum but java's jdbc API uses integer.
+     * See {@link java.sql.Types}
+     */
     private int sqlType;
 
     public Column(String columnName, T val, int sqlType) {

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
index d11d1b3..4992ed7 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java
@@ -64,9 +64,9 @@ public class JdbcClient {
             sb.append(placeHolders).append(")");
 
             String query = sb.toString();
-            if(LOG.isDebugEnabled()) {
-                LOG.debug("Executing query " + query);
-            }
+
+            LOG.debug("Executing query {}", query);
+
 
             PreparedStatement preparedStatement = connection.prepareStatement(query);
             preparedStatement.setQueryTimeout(queryTimeoutSecs);

http://git-wip-us.apache.org/repos/asf/storm/blob/e157edb2/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index fbb0b6c..32c012e 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -19,7 +19,7 @@ package org.apache.storm.jdbc.topology;
 
 import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.TopologyBuilder;
-import org.apache.storm.jdbc.bolt.JdbcBolt;
+import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
 import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
 
 
@@ -34,12 +34,10 @@ public class UserPersistanceTopology extends AbstractUserTopology {
 
     @Override
     public StormTopology getTopology() {
-        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt()
-                .withConfigKey(JDBC_CONF)
+        JdbcLookupBolt departmentLookupBolt = new JdbcLookupBolt(JDBC_CONF)
                 .withJdbcLookupMapper(this.jdbcLookupMapper)
                 .withSelectSql(SELECT_QUERY);
-        JdbcBolt userPersistanceBolt = new JdbcBolt()
-                .withConfigKey(JDBC_CONF)
+        JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(JDBC_CONF)
                 .withTableName(TABLE_NAME)
                 .withJdbcMapper(this.jdbcMapper);