You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by Parth-Brahmbhatt <gi...@git.apache.org> on 2015/01/06 23:41:07 UTC

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

GitHub user Parth-Brahmbhatt opened a pull request:

    https://github.com/apache/storm/pull/374

    Storm-616 : Storm-jdbc connector.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Parth-Brahmbhatt/incubator-storm STORM-616

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #374
    
----
commit 5b160168c75c0e8c4c402a5e24f606dab697fbef
Author: Parth Brahmbhatt <br...@gmail.com>
Date:   2015-01-06T03:14:18Z

    STORM-616: Jdbc connector for storm.

commit ab9f778ae50a1e224ebdcc58e6249009fc1f91cc
Author: Parth Brahmbhatt <br...@gmail.com>
Date:   2015-01-06T03:23:52Z

    Merge remote-tracking branch 'upstream/master' into STORM-616

commit d260759ac203383e27668a7cb7090926029f7406
Author: Parth Brahmbhatt <br...@gmail.com>
Date:   2015-01-06T03:31:05Z

    STORM-616 : removing unintended changes.

commit 079deda496d16bd896611da706402c9df7e2319f
Author: Parth Brahmbhatt <br...@gmail.com>
Date:   2015-01-06T17:47:40Z

    STORM-616:Adding storm-jdbc as external module in pom. Adding links to hikariCP configuration in README.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973996
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JDBCClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
    +
    +    private HikariDataSource dataSource;
    +
    +    public JDBCClient(Map<String, Object> map) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    --- End diff --
    
    yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039341
  
    --- Diff: external/storm-jdbc/pom.xml ---
    @@ -0,0 +1,125 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>0.10.0-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +
    +    <artifactId>storm-jdbc</artifactId>
    +
    +    <developers>
    +        <developer>
    +            <id>Parth-Brahmbhatt</id>
    +            <name>Parth Brahmbhatt</name>
    +            <email>brahmbhatt.parth@gmail.com</email>
    +        </developer>
    +    </developers>
    +
    +    <properties>
    +        <hikari.version>2.2.5</hikari.version>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.commons</groupId>
    +            <artifactId>commons-lang3</artifactId>
    +            <version>3.3</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>17.0</version>
    --- End diff --
    
    Is there a way we can use the guava dependency already included with Storm (I would understand if not, just double-checking).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24036406
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    +1 for reportError() before fail. Perhaps we should document this as a best practice and revisit other modules to implement it there as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962128
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    +The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
    +
    +```java
    +public interface JdbcMapper  extends Serializable {
    +    List<Column> getColumns(ITuple tuple);
    +}
    +```
    +
    +The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
    +
    +### SimpleJdbcMapper
    +`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
    +tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
    +the database table that you intend to write to.
    +
    +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
    +
    +The following code creates a `SimpleJdbcMapper` instance that:
    +
    +1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
    +2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
    +automatically figure out the column names and corresponding data types of the table that you intend to write to. 
    +Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
    +
    +```java
    +Map hikariConfigMap = Maps.newHashMap();
    +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    +hikariConfigMap.put("dataSource.user","root");
    +hikariConfigMap.put("dataSource.password","password");
    +String tableName = "user_details";
    +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
    +```
    +The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
    +If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
    +and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
    +`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
    +`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
    +In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
    +you can initialize the `jdbcMapper` as below:
    +
    +```java
    +List<Column> columnSchema = Lists.newArrayList(
    +    new Column("user_id", java.sql.Types.INTEGER),
    +    new Column("user_name", java.sql.Types.VARCHAR));
    +    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
    +```
    +
    +### JdbcBolt
    +To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
    +the rows will be inserted.
    +
    + ```java
    +Config config = new Config();
    +config.put("jdbc.conf", hikariConfigMap);
    +JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
    +                                    .withTableName("user_details")
    +                                    .withJdbcMapper(simpleJdbcMapper);
    + ```
    +### JdbcTridentState
    +We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
    +state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
    +hikari configuration map. See the example below:
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withMapper(jdbcMapper)
    +        .withTableName("user_details");
    +
    +JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
    +```
    +
    +## Lookup from Database
    +We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
    +executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
    +
    +```java
    +    void declareOutputFields(OutputFieldsDeclarer declarer);
    +    List<Column> getColumns(ITuple tuple);
    +    public List<Values> toTuple(ITuple input, List<Column> columns);
    +```
    +
    +The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
    +tuple. 
    +The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
    +For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
    +user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
    +The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
    +second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
    +returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
    +query, the second item to second `?` in query and so on. 
    +The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
    +and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
    +of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
    +
    +###SimpleJdbcLookupMapper
    +`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
    +
    +To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
    +columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
    +that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
    +SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
    +`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
    +select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
    +and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
    +So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
    +`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
    +will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
    +For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
    +is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
    +
    +```java
    +Fields outputFields = new Fields("user_id", "user_name", "create_date");
    +List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +```
    +
    +### JdbcLookupBolt
    +To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the `JdbcLookupMapper` and the select query to execute.
    +
    +```java
    +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
    +        .withSelectSql("select user_name from user_details where user_id = ?")
    +```
    +
    +### JdbcTridentState for lookup
    +We also support a trident query state that can be used with trident topologies. 
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
    +        .withSelectQuery("select user_name from user_details where user_id = ?");
    +```
    +
    +## Example:
    +A runnable example can be found in the `src/test/java/topology` directory.
    +
    +### Setup
    +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
    +* The test topologies executes the following queries so your intended DB must support these queries for test topologies
    +to work. 
    +```SQL
    +create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
    +create table if not exists department (dept_id integer, dept_name varchar(100));
    +create table if not exists user_department (user_id integer, dept_id integer);
    +insert into department values (1, 'R&D');
    +insert into department values (2, 'Finance');
    +insert into department values (3, 'HR');
    +insert into department values (4, 'Sales');
    +insert into user_department values (1, 1);
    +insert into user_department values (2, 2);
    +insert into user_department values (3, 3);
    +insert into user_department values (4, 4);
    +select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
    +```
    +### Execution
    +Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
    +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
    +
    +Mysql Example:
    +```
    +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
    +org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
    --- End diff --
    
    password argument is still needed why do you think it needs to be removed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24014941
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    --- End diff --
    
    I'm not sure I like this name.  I would rather have it called something like JdbcInsertBolt.  Just because Jdbc does not by itself indicate that it will insert anything into the database.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Crystark <gi...@git.apache.org>.
Github user Crystark commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-74867621
  
    This would indeed be nice but after trying it out it seems that if a batch has more than one tuple to insert, then only the last tuple is inserted.
    
    In [JdbcClient#insert](https://github.com/Parth-Brahmbhatt/incubator-storm/blob/STORM-616/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java#L48) we can see that there's only one set of `VALUES` (line 61-64) but then it loops on all the columns and each one overrides the previous values that were set as SQL parameters leading to only the last one beeing used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-73918034
  
    +1 looks good to me. I don't think we need the LICENSE file here, but that's a trivial change that can be done at merge time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-71579088
  
    @revans2 @ptgoetz can you take a look at this PR. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039222
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039215
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt withConfigKey(String configKey) {
    --- End diff --
    
    Moved back to constructor. I am not sure what you mean when you say we could just pass the config into the Bolt. Hikari has many configuration https://github.com/brettwooldridge/HikariCP. The configKey is so user can provide any subset of those configs. so the configKey is just name of the config key in your submitted storm-conf, the value of this should be a map of hikari config keys to their corresponding values.
    
    I can extract a connectionProvider interface and create a Hikari implementation that users have to construct to initialize the Bolts. This will mean the bolts will not have a config Key, users will be able to use whatever connection pool they feel most comfortable with and the configuration details will live in the connection provider. Do you think that will be better than what we currently have.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24016216
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    --- End diff --
    
    Building the query through string manipulation each time seems like a real waste to me.  Why are we not using JDBC stored queries instead or in addition to this?  Also even though SQL is a standard there are always inconsistencies between the different databases.  It feels like we are reinventing the wheel here and missing some things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-70002653
  
    @Parth-Brahmbhatt  this is also need to be included under storm-dist/binary/src/main/assembly/binary.xml  like here https://github.com/apache/storm/blob/master/storm-dist/binary/src/main/assembly/binary.xml#L93


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24044371
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +
    +import java.io.Serializable;
    +import java.lang.reflect.Field;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +
    +public class Column<T> implements Serializable {
    +
    +    private String columnName;
    +    private T val;
    +    private int sqlType;
    +
    +    public Column(String columnName, T val, int sqlType) {
    --- End diff --
    
    The javadocs is what I really was interested in.  The markdown doc was really great, but an int for sqlType is really confusing without some documentation right there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r23282556
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    --- End diff --
    
    The name of the file has different casing than the name of the class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-69978982
  
    overall I am +1 on merging this. There are minor nit-picks in the above comments. I would like to see the unit tests for the bolt and state. Probably we can use in-process jdbc db to test it out. We can do this as part of another JIRA.
    @ptgoetz @revans2  please take a look at the PR. If we need a sponsor for jdbc I can volunteer. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22974954
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    I am not seeing this being used in any other connector instead all the other connectors, hdfs/hbase/kafka just log the error. I am fine with adding it as it just seems to report the error so the ui can display it but disabling the log may result in loss of information which can make debugging harder. I propose to report the error but also log it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-73749989
  
    So I am OK with the code as it is now.  I would prefer to see required configuration values become constructor arguments, but I am not going to block this going in without it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973617
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,80 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    +                    }
    +                }
    +            }
    +            this.collector.ack(tuple);
    +        } catch (Exception e) {
    +            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
    +            this.collector.fail(tuple);
    --- End diff --
    
    Another approach is to derive from IBasicBolt and let the default error handling kick in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24015631
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    +
    +            String query = sb.toString();
    +            if(LOG.isDebugEnabled()) {
    +                LOG.debug("Executing query " + query);
    --- End diff --
    
    Please use the slf convention with ```LOG.debug("Executing query {}", query);``` instead of ```if(LOG.isDebugEnabled()...```, but this is really minor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962079
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    --- End diff --
    
    fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Crystark <gi...@git.apache.org>.
Github user Crystark commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24902452
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    --- End diff --
    
    The problem at this part is that there's only one set of values though this method works with multiple set of values.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22972968
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JDBCClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
    +
    +    private HikariDataSource dataSource;
    +
    +    public JDBCClient(Map<String, Object> map) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    --- End diff --
    
    Each bolt has its own JDBCClient. Each JDBCClient has its own data source. Does it mean that each bolt has its own dedicated connection pool ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24014770
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    I agree with reportError before the fail.  This allows the error to show up in the UI and be somewhere besides the logs.  It will also log it for you, so you don't need to log it above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24041005
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    --- End diff --
    
    We are using prepared statements with place holders so we are leveraging the statement cache., not sure if you mean something else by "JDBC stored queries". The statement cache is generally on server side and as long as we are using preparedstatements with place holders "?" we should get the benefit of not having to compile/parse the sql and the cached execution plan.  
    
    The only reason we construct the sql query each time is because this is more flexible then a single static sql. For inserts people can have columns with default values and their topology can chose to emit values for those columns sometimes and sometimes they can just ignore those columns and both scenarios will be supported.
    
    I can provide an override sql that user can provide , personally I would rather wait till we actually come across a DB or user that runs into some issue before adding that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-73911162
  
    I am +1 for merging this in now.  But I am not a DB expert so I would feel more comfortable with others giving it a +1 too before going forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22959009
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    +The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
    +
    +```java
    +public interface JdbcMapper  extends Serializable {
    +    List<Column> getColumns(ITuple tuple);
    +}
    +```
    +
    +The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
    +
    +### SimpleJdbcMapper
    +`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
    +tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
    +the database table that you intend to write to.
    +
    +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
    +
    +The following code creates a `SimpleJdbcMapper` instance that:
    +
    +1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
    +2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
    +automatically figure out the column names and corresponding data types of the table that you intend to write to. 
    +Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
    +
    +```java
    +Map hikariConfigMap = Maps.newHashMap();
    +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    +hikariConfigMap.put("dataSource.user","root");
    +hikariConfigMap.put("dataSource.password","password");
    +String tableName = "user_details";
    +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
    +```
    +The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
    +If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
    +and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
    +`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
    +`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
    +In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
    +you can initialize the `jdbcMapper` as below:
    +
    +```java
    +List<Column> columnSchema = Lists.newArrayList(
    +    new Column("user_id", java.sql.Types.INTEGER),
    +    new Column("user_name", java.sql.Types.VARCHAR));
    +    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
    +```
    +
    +### JdbcBolt
    +To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
    +the rows will be inserted.
    +
    + ```java
    +Config config = new Config();
    +config.put("jdbc.conf", hikariConfigMap);
    +JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
    +                                    .withTableName("user_details")
    +                                    .withJdbcMapper(simpleJdbcMapper);
    + ```
    +### JdbcTridentState
    +We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
    +state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
    +hikari configuration map. See the example below:
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withMapper(jdbcMapper)
    +        .withTableName("user_details");
    +
    +JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
    +```
    +
    +## Lookup from Database
    +We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
    +executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
    +
    +```java
    +    void declareOutputFields(OutputFieldsDeclarer declarer);
    +    List<Column> getColumns(ITuple tuple);
    +    public List<Values> toTuple(ITuple input, List<Column> columns);
    +```
    +
    +The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
    +tuple. 
    +The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
    +For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
    +user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
    +The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
    +second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
    +returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
    +query, the second item to second `?` in query and so on. 
    +The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
    +and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
    +of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
    +
    +###SimpleJdbcLookupMapper
    +`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
    +
    +To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
    +columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
    +that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
    +SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
    +`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
    +select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
    +and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
    +So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
    +`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
    +will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
    +For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
    +is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
    +
    +```java
    +Fields outputFields = new Fields("user_id", "user_name", "create_date");
    +List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +```
    +
    +### JdbcLookupBolt
    +To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the `JdbcLookupMapper` and the select query to execute.
    +
    +```java
    +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
    +        .withSelectSql("select user_name from user_details where user_id = ?")
    +```
    +
    +### JdbcTridentState for lookup
    +We also support a trident query state that can be used with trident topologies. 
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
    +        .withSelectQuery("select user_name from user_details where user_id = ?");
    +```
    +
    +## Example:
    +A runnable example can be found in the `src/test/java/topology` directory.
    +
    +### Setup
    +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
    +* The test topologies executes the following queries so your intended DB must support these queries for test topologies
    +to work. 
    +```SQL
    +create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
    +create table if not exists department (dept_id integer, dept_name varchar(100));
    +create table if not exists user_department (user_id integer, dept_id integer);
    +insert into department values (1, 'R&D');
    +insert into department values (2, 'Finance');
    +insert into department values (3, 'HR');
    +insert into department values (4, 'Sales');
    +insert into user_department values (1, 1);
    +insert into user_department values (2, 2);
    +insert into user_department values (3, 3);
    +insert into user_department values (4, 4);
    +select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
    +```
    +### Execution
    +Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
    +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
    +
    +Mysql Example:
    +```
    +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
    +org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
    --- End diff --
    
    this should be UserPersistentTopology and "password" argument should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962760
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    --- End diff --
    
    I am ok with current implementation but to be consistent with other connectors (hdfs, hive) can we make this as withConfigKey instead of a constructor param?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24015210
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,86 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt withConfigKey(String configKey) {
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    --- End diff --
    
    Please anchor these emits to the incoming tuple.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/374


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24461061
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,84 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    --- End diff --
    
    This is done, added all required param as part of constructors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-72735923
  
    Overall I think this is a good start and I am supportive. @revans2 brought up some good points and I tried to expand/clarify in several places. I feel more efficient use of prepared statements is an important. The ability to invoke stored procedures is something we might want to consider, but not a requirement, especially since calling stored procedures can be a VERY DB-dependent thing.
    
    I'm also willing to act as a sponsor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-75650396
  
    Thanks @Parth-Brahmbhatt  merged into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039920
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +
    +import java.io.Serializable;
    +import java.lang.reflect.Field;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    --- End diff --
    
    I removed some methods and forgot to remove unused imports. Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22974513
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JDBCClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
    +
    +    private HikariDataSource dataSource;
    +
    +    public JDBCClient(Map<String, Object> map) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    --- End diff --
    
    So for each bolt, how many connections do you advise to be in the pool? Given each bolt has one thread then I would say 1. But then it would loose time reconnecting each tuple that may have caused some kind of error. So then I would be inclined to say 2 connections, but then for 10 bolts, I would need 20 connections. What is the recommended value you use in production?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22974645
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    +The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
    +
    +```java
    +public interface JdbcMapper  extends Serializable {
    +    List<Column> getColumns(ITuple tuple);
    +}
    +```
    +
    +The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
    +
    +### SimpleJdbcMapper
    +`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
    +tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
    +the database table that you intend to write to.
    +
    +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
    +
    +The following code creates a `SimpleJdbcMapper` instance that:
    +
    +1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
    +2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
    +automatically figure out the column names and corresponding data types of the table that you intend to write to. 
    +Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
    +
    +```java
    +Map hikariConfigMap = Maps.newHashMap();
    +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    +hikariConfigMap.put("dataSource.user","root");
    +hikariConfigMap.put("dataSource.password","password");
    +String tableName = "user_details";
    +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
    +```
    +The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
    +If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
    +and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
    +`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
    +`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
    +In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
    +you can initialize the `jdbcMapper` as below:
    +
    +```java
    +List<Column> columnSchema = Lists.newArrayList(
    +    new Column("user_id", java.sql.Types.INTEGER),
    +    new Column("user_name", java.sql.Types.VARCHAR));
    +    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
    +```
    +
    +### JdbcBolt
    +To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
    +the rows will be inserted.
    +
    + ```java
    +Config config = new Config();
    +config.put("jdbc.conf", hikariConfigMap);
    +JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
    +                                    .withTableName("user_details")
    +                                    .withJdbcMapper(simpleJdbcMapper);
    + ```
    +### JdbcTridentState
    +We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
    +state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
    +hikari configuration map. See the example below:
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withMapper(jdbcMapper)
    +        .withTableName("user_details");
    +
    +JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
    +```
    +
    +## Lookup from Database
    +We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
    +executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
    +
    +```java
    +    void declareOutputFields(OutputFieldsDeclarer declarer);
    +    List<Column> getColumns(ITuple tuple);
    +    public List<Values> toTuple(ITuple input, List<Column> columns);
    +```
    +
    +The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
    +tuple. 
    +The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
    +For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
    +user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
    +The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
    +second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
    +returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
    +query, the second item to second `?` in query and so on. 
    +The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
    +and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
    +of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
    +
    +###SimpleJdbcLookupMapper
    +`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
    +
    +To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
    +columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
    +that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
    +SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
    +`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
    +select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
    +and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
    +So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
    +`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
    +will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
    +For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
    +is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
    +
    +```java
    +Fields outputFields = new Fields("user_id", "user_name", "create_date");
    +List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +```
    +
    +### JdbcLookupBolt
    +To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the `JdbcLookupMapper` and the select query to execute.
    +
    +```java
    +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
    +        .withSelectSql("select user_name from user_details where user_id = ?")
    +```
    +
    +### JdbcTridentState for lookup
    +We also support a trident query state that can be used with trident topologies. 
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
    +        .withSelectQuery("select user_name from user_details where user_id = ?");
    +```
    +
    +## Example:
    +A runnable example can be found in the `src/test/java/topology` directory.
    +
    +### Setup
    +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
    +* The test topologies executes the following queries so your intended DB must support these queries for test topologies
    +to work. 
    +```SQL
    +create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
    +create table if not exists department (dept_id integer, dept_name varchar(100));
    +create table if not exists user_department (user_id integer, dept_id integer);
    +insert into department values (1, 'R&D');
    +insert into department values (2, 'Finance');
    +insert into department values (3, 'HR');
    +insert into department values (4, 'Sales');
    +insert into user_department values (1, 1);
    +insert into user_department values (2, 2);
    +insert into user_department values (3, 3);
    +insert into user_department values (4, 4);
    +select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
    +```
    +### Execution
    +Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
    +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
    +
    +Mysql Example:
    +```
    +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
    +org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
    --- End diff --
    
    my bad I meant to say "user". The example given above for submitting the UserPersistanceTridentTopology contains more args than required
    "storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
    org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
    jdbc:mysql://localhost/test root password user UserPersistenceTopology"
    Here "root password user" I think "user" needs to be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24430776
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,84 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    --- End diff --
    
    Similar comment to JdbcInsertBolt.  jdbcLookupMapper and selectQuery both look to be required, as such I would really rather have them be constructor parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24040425
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    +
    +            String query = sb.toString();
    +            if(LOG.isDebugEnabled()) {
    +                LOG.debug("Executing query " + query);
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039560
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,86 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt withConfigKey(String configKey) {
    +        this.configKey = configKey;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22950855
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    --- End diff --
    
    license included twice


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24043926
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt withConfigKey(String configKey) {
    --- End diff --
    
    OK So I see two ways to do the Hikari configuration.
    The current way.
    ```
    Map<String, String> jdbcConf = new HashMap<String, String>();
    jdbcConf.put("a","b");
    stormConf.put("something", jdbcConf);
    bolt.withConfigKey("something");
    ```
    
    or we could just pass the jdbc config to the Bolt, and not send it through the topology config first.
    
    ```
    Map<String, String> jdbcConf = new HashMap<String, String>();
    jdbcConf.put("a","b");
    bolt.withConfig(jdbcConf);
    ``` 
    I don't understand why we need the extra step of storing a config inside another config, and providing a layer of indirection.
    
    As for constructor values vs builder values, it is a question of required vs optional parameters.
    
    ```
    Map<String, String> jdbcConf = new HashMap<String, String>();
    jdbcConf.put("a","b");
    stormConf.put("something", jdbcConf);
    //oops forgot this bolt.withConfigKey("something");
    ```
    results in an NPE after the topology is launched.  If it were a constructor parameter.
    ```
    Map<String, String> jdbcConf = new HashMap<String, String>();
    jdbcConf.put("a","b");
    stormConf.put("something", jdbcConf);
    new Bolt(/*forgot "something"*/);
    ```
    is a compile time error.  I am fine with having a builder pattern for anything that is optional, or even a way to override/extend require values.  I see it as defensive programming.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-70010386
  
    @harshach  modified storm-dist/pom.xml and also removed the config key from the constructor and added withConfigKey  method to be consistent with other connectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22972354
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---
    @@ -0,0 +1,53 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.commons.lang.Validate;
    +import org.apache.storm.jdbc.common.JDBCClient;
    --- End diff --
    
    Notice that JDBCClient and JdbcMapper has different casing notations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24434272
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    --- End diff --
    
    I am not a DB expert I read up on JDBC some more and talked to @kishorvpatil I think the current code is going to be fine.  I am a little concerned about the overhead of doing string manipulations to recreate each prepared statement, but I am OK with waiting for it to become a problem before we implement some sort of caching.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962179
  
    --- Diff: external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---
    @@ -0,0 +1,102 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.generated.StormTopology;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.common.JDBCClient;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    +import org.apache.storm.jdbc.spout.UserSpout;
    +import backtype.storm.LocalCluster;
    +
    +import java.sql.Types;
    +import java.util.List;
    +import java.util.Map;
    +
    +public abstract class AbstractUserTopology {
    +    private static final List<String> setupSqls = Lists.newArrayList(
    +            "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
    +            "create table if not exists department (dept_id integer, dept_name varchar(100))",
    +            "create table if not exists user_department (user_id integer, dept_id integer)",
    +            "insert into department values (1, 'R&D')",
    +            "insert into department values (2, 'Finance')",
    +            "insert into department values (3, 'HR')",
    +            "insert into department values (4, 'Sales')",
    +            "insert into user_department values (1, 1)",
    +            "insert into user_department values (2, 2)",
    +            "insert into user_department values (3, 3)",
    +            "insert into user_department values (4, 4)"
    +    );
    +    protected UserSpout userSpout;
    +    protected JdbcMapper jdbcMapper;
    +    protected JdbcLookupMapper jdbcLookupMapper;
    +
    +    protected static final String TABLE_NAME = "user";
    +    protected static final String JDBC_CONF = "jdbc.conf";
    +    protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
    +            " and user_department.user_id = ?";
    +
    +    public void execute(String[] args) throws Exception {
    +        if (args.length != 4 && args.length != 5) {
    +            System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
    +                    + "<user> <password> [topology name]");
    +            System.exit(-1);
    +        }
    +        Map map = Maps.newHashMap();
    +        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
    +        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
    +        map.put("dataSource.user", args[2]);//root
    +        map.put("dataSource.password", args[3]);//password
    +
    +        Config config = new Config();
    +        config.put(JDBC_CONF, map);
    +
    +        JDBCClient jdbcClient = new JDBCClient(map);
    +        for (String sql : setupSqls) {
    +            jdbcClient.executeSql(sql);
    +        }
    +
    +        this.userSpout = new UserSpout();
    +        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
    +        Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
    +        List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +        this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +
    +        if (args.length == 4) {
    +            LocalCluster cluster = new LocalCluster();
    +            cluster.submitTopology("test", config, getTopology());
    +            Thread.sleep(30000);
    +            cluster.killTopology("test");
    +            cluster.shutdown();
    +            System.exit(0);
    +        } else {
    +            StormSubmitter.submitTopology(args[5], config, getTopology());
    --- End diff --
    
    this should be args[4]?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973534
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    --- End diff --
    
    We usually do not log each exception. we let the reportError decide what to do with it.  So to use this module we would have to turn off this logging completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r23339923
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    --- End diff --
    
    @itaifrenkel fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Crystark <gi...@git.apache.org>.
Github user Crystark commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-75028911
  
    Works like a charm :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973408
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,80 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    +                    }
    +                }
    +            }
    +            this.collector.ack(tuple);
    +        } catch (Exception e) {
    +            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
    +            this.collector.fail(tuple);
    --- End diff --
    
    this.collector.reportError(e) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973990
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/AbstractJdbcBolt.java ---
    @@ -0,0 +1,53 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.task.OutputCollector;
    +import backtype.storm.task.TopologyContext;
    +import backtype.storm.topology.base.BaseRichBolt;
    +import org.apache.commons.lang.Validate;
    +import org.apache.storm.jdbc.common.JDBCClient;
    --- End diff --
    
    fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24430530
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcInsertBolt.java ---
    @@ -0,0 +1,79 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcInsertBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcInsertBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcInsertBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcInsertBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcInsertBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcInsertBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    public JdbcInsertBolt withQueryTimeoutSecs(int queryTimeoutSecs) {
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    --- End diff --
    
    For me the issue is with required configuration vs optional configuration.  configKey is not the only required configuration.  table name and jdbcMapper both appear to be required in the current code.  I would much rather have them be parameters to the constructor instead of builder methods, or have a reasonable default set in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-75185107
  
    +1.  @Parth-Brahmbhatt  Thanks for the quick fix . @Crystark  Thanks for review and nice catch.
    @revans2  @ptgoetz  I think this is ready to merge it in will give it a day or two before pushing it to master. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-75848428
  
    Yes it looks like it is a JDK8 issue.  JDK6 compiles the code just fine. I'll file a new JIRA for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24050731
  
    --- Diff: external/storm-jdbc/pom.xml ---
    @@ -0,0 +1,125 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    + Licensed to the Apache Software Foundation (ASF) under one or more
    + contributor license agreements.  See the NOTICE file distributed with
    + this work for additional information regarding copyright ownership.
    + The ASF licenses this file to You under the Apache License, Version 2.0
    + (the "License"); you may not use this file except in compliance with
    + the License.  You may obtain a copy of the License at
    +
    +     http://www.apache.org/licenses/LICENSE-2.0
    +
    + Unless required by applicable law or agreed to in writing, software
    + distributed under the License is distributed on an "AS IS" BASIS,
    + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + See the License for the specific language governing permissions and
    + limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <artifactId>storm</artifactId>
    +        <groupId>org.apache.storm</groupId>
    +        <version>0.10.0-SNAPSHOT</version>
    +        <relativePath>../../pom.xml</relativePath>
    +    </parent>
    +
    +    <artifactId>storm-jdbc</artifactId>
    +
    +    <developers>
    +        <developer>
    +            <id>Parth-Brahmbhatt</id>
    +            <name>Parth Brahmbhatt</name>
    +            <email>brahmbhatt.parth@gmail.com</email>
    +        </developer>
    +    </developers>
    +
    +    <properties>
    +        <hikari.version>2.2.5</hikari.version>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.storm</groupId>
    +            <artifactId>storm-core</artifactId>
    +            <version>${project.version}</version>
    +            <scope>provided</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.commons</groupId>
    +            <artifactId>commons-lang3</artifactId>
    +            <version>3.3</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>com.google.guava</groupId>
    +            <artifactId>guava</artifactId>
    +            <version>17.0</version>
    --- End diff --
    
    I tried doing that but without a direct dependency the compile phase fails as storm-core is provided which is non transitive. I could change guava to provided scope, will that work?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-74940637
  
    @Crystark  super sorry about this I am not sure how the heck did I miss this. The code you pointed at is fine but I should be using executeBatch instead of executeUpdate. I have modified the code to use executeBatch and also modified the test to test the bug you pointed out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22978604
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    +The main API for inserting data in a table using JDBC is the `org.apache.storm.jdbc.mapper.JdbcMapper` interface:
    +
    +```java
    +public interface JdbcMapper  extends Serializable {
    +    List<Column> getColumns(ITuple tuple);
    +}
    +```
    +
    +The `getColumns()` method defines how a storm tuple maps to a list of columns representing a row in a database.
    +
    +### SimpleJdbcMapper
    +`storm-jdbc` includes a general purpose `JdbcMapper` implementation called `SimpleJdbcMapper` that can map Storm
    +tuple to a Database row. `SimpleJdbcMapper` assumes that the storm tuple has fields with same name as the column name in 
    +the database table that you intend to write to.
    +
    +To use `SimpleJdbcMapper`, you simply tell it the tableName that you want to write to and provide a hikari configuration map.
    +
    +The following code creates a `SimpleJdbcMapper` instance that:
    +
    +1. Will allow the mapper to transform a storm tuple to a list of columns mapping to a row in table test.user_details.
    +2. Will use the provided HikariCP configuration to establish a connection pool with specified Database configuration and
    +automatically figure out the column names and corresponding data types of the table that you intend to write to. 
    +Please see https://github.com/brettwooldridge/HikariCP#configuration-knobs-baby to learn more about hikari configuration properties.
    +
    +```java
    +Map hikariConfigMap = Maps.newHashMap();
    +hikariConfigMap.put("dataSourceClassName","com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
    +hikariConfigMap.put("dataSource.url", "jdbc:mysql://localhost/test");
    +hikariConfigMap.put("dataSource.user","root");
    +hikariConfigMap.put("dataSource.password","password");
    +String tableName = "user_details";
    +JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(tableName, map);
    +```
    +The mapper initialized in the example above assumes a storm tuple has value for all the columns. 
    +If your storm tuple only has fields for a subset of columns i.e. if some of the columns in your table have default values 
    +and you want to only insert values for columns with no default values you can enforce the behavior by initializing the 
    +`SimpleJdbcMapper` with explicit columnschema. For example, if you have a user_details table 
    +`create table if not exists user_details (user_id integer, user_name varchar(100), dept_name varchar(100), create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);`
    +In this table the create_time column has a default value. To ensure only the columns with no default values are inserted 
    +you can initialize the `jdbcMapper` as below:
    +
    +```java
    +List<Column> columnSchema = Lists.newArrayList(
    +    new Column("user_id", java.sql.Types.INTEGER),
    +    new Column("user_name", java.sql.Types.VARCHAR));
    +    JdbcMapper simpleJdbcMapper = new SimpleJdbcMapper(columnSchema);
    +```
    +
    +### JdbcBolt
    +To use the `JdbcBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the JdbcMapper implementation to covert storm tuple to DB row and the table name in which 
    +the rows will be inserted.
    +
    + ```java
    +Config config = new Config();
    +config.put("jdbc.conf", hikariConfigMap);
    +JdbcBolt userPersistanceBolt = new JdbcBolt("jdbc.conf")
    +                                    .withTableName("user_details")
    +                                    .withJdbcMapper(simpleJdbcMapper);
    + ```
    +### JdbcTridentState
    +We also support a trident persistent state that can be used with trident topologies. To create a jdbc persistent trident
    +state you need to initialize it with the table name, the JdbcMapper instance and name of storm config key that holds the
    +hikari configuration map. See the example below:
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withMapper(jdbcMapper)
    +        .withTableName("user_details");
    +
    +JdbcStateFactory jdbcStateFactory = new JdbcStateFactory(options);
    +```
    +
    +## Lookup from Database
    +We support `select` queries from databases to allow enrichment of storm tuples in a topology. The main API for 
    +executing select queries against a database using JDBC is the `org.apache.storm.jdbc.mapper.JdbcLookupMapper` interface:
    +
    +```java
    +    void declareOutputFields(OutputFieldsDeclarer declarer);
    +    List<Column> getColumns(ITuple tuple);
    +    public List<Values> toTuple(ITuple input, List<Column> columns);
    +```
    +
    +The `declareOutputFields` method is used to indicate what fields will be emitted as part of output tuple of processing a storm 
    +tuple. 
    +The `getColumns` method specifies the place holder columns in a select query and their SQL type and the value to use.
    +For example in the user_details table mentioned above if you were executing a query `select user_name from user_details where
    +user_id = ? and create_time > ?` the `getColumns` method would take a storm input tuple and return a List containing two items.
    +The first instance of `Column` type's `getValue()` method will be used as the value of `user_id` to lookup for and the
    +second instance of `Column` type's `getValue()` method will be used as the value of `create_time`.Note: the order in the
    +returned list determines the place holder's value. In other words the first item in the list maps to first `?` in select
    +query, the second item to second `?` in query and so on. 
    +The `toTuple` method takes in the input tuple and a list of columns representing a DB row as a result of the select query
    +and returns a list of values to be emitted. Please note that it returns a list of `Values` and not just a single instance
    +of `Values`. This allows a for a single DB row to be mapped to multiple output storm tuples.
    +
    +###SimpleJdbcLookupMapper
    +`storm-jdbc` includes a general purpose `JdbcLookupMapper` implementation called `SimpleJdbcLookupMapper`. 
    +
    +To use `SimpleJdbcMapper`, you have to initialize it with the fields that will be outputted by your bolt and the list of
    +columns that are used in your select query as place holder. The following example shows initialization of a `SimpleJdbcLookupMapper`
    +that declares `user_id,user_name,create_date` as output fields and `user_id` as the place holder column in select query.
    +SimpleJdbcMapper assumes the field name in your tuple is equal to the place holder column name, i.e. in our example 
    +`SimpleJdbcMapper` will look for a field `use_id` in the input tuple and use its value as the place holder's value in the
    +select query. For constructing output tuples, it looks for fields specified in `outputFields` in the input tuple first, 
    +and if it is not found in input tuple then it looks at select queries output row for a column with same name as field name. 
    +So in the example below if the input tuple had fields `user_id, create_date` and the select query was 
    +`select user_name from user_details where user_id = ?`, For each input tuple `SimpleJdbcLookupMapper.getColumns(tuple)` 
    +will return the value of `tuple.getValueByField("user_id")` which will be used as the value in `?` of select query. 
    +For each output row from DB, `SimpleJdbcLookupMapper.toTuple()` will use the `user_id, create_date` from the input tuple as 
    +is adding only `user_name` from the resulting row and returning these 3 fields as a single output tuple.
    +
    +```java
    +Fields outputFields = new Fields("user_id", "user_name", "create_date");
    +List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +```
    +
    +### JdbcLookupBolt
    +To use the `JdbcLookupBolt`, construct it with configuration key in your storm config that hold the hikari configuration map.
    +In addition you must specify the `JdbcLookupMapper` and the select query to execute.
    +
    +```java
    +JdbcLookupBolt userNameLookupBolt = new JdbcLookupBolt("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(outputFields, queryParamColumns))
    +        .withSelectSql("select user_name from user_details where user_id = ?")
    +```
    +
    +### JdbcTridentState for lookup
    +We also support a trident query state that can be used with trident topologies. 
    +
    +```java
    +JdbcState.Options options = new JdbcState.Options()
    +        .withConfigKey("jdbc.conf")
    +        .withJdbcLookupMapper(new SimpleJdbcLookupMapper(new Fields("user_name"), Lists.newArrayList(new Column("user_id", Types.INTEGER))))
    +        .withSelectQuery("select user_name from user_details where user_id = ?");
    +```
    +
    +## Example:
    +A runnable example can be found in the `src/test/java/topology` directory.
    +
    +### Setup
    +* Ensure you have included JDBC implementation dependency for your chosen database as part of your build configuration.
    +* The test topologies executes the following queries so your intended DB must support these queries for test topologies
    +to work. 
    +```SQL
    +create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date);
    +create table if not exists department (dept_id integer, dept_name varchar(100));
    +create table if not exists user_department (user_id integer, dept_id integer);
    +insert into department values (1, 'R&D');
    +insert into department values (2, 'Finance');
    +insert into department values (3, 'HR');
    +insert into department values (4, 'Sales');
    +insert into user_department values (1, 1);
    +insert into user_department values (2, 2);
    +insert into user_department values (3, 3);
    +insert into user_department values (4, 4);
    +select dept_name from department, user_department where department.dept_id = user_department.dept_id and user_department.user_id = ?;
    +```
    +### Execution
    +Run the `org.apache.storm.jdbc.topology.UserPersistanceTopology` class using storm jar command. The class expects 5 args
    +storm jar org.apache.storm.jdbc.topology.UserPersistanceTopology <dataSourceClassName> <dataSource.url> <user> <password> [topology name]
    +
    +Mysql Example:
    +```
    +storm jar ~/repo/incubator-storm/external/storm-jdbc/target/storm-jdbc-0.10.0-SNAPSHOT-jar-with-dependencies.jar 
    +org.apache.storm.jdbc.topology.UserPersistanceTridentTopology  com.mysql.jdbc.jdbc2.optional.MysqlDataSource 
    --- End diff --
    
    removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962868
  
    --- Diff: external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---
    @@ -0,0 +1,102 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.generated.StormTopology;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.common.JDBCClient;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    +import org.apache.storm.jdbc.spout.UserSpout;
    +import backtype.storm.LocalCluster;
    +
    +import java.sql.Types;
    +import java.util.List;
    +import java.util.Map;
    +
    +public abstract class AbstractUserTopology {
    +    private static final List<String> setupSqls = Lists.newArrayList(
    +            "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
    +            "create table if not exists department (dept_id integer, dept_name varchar(100))",
    +            "create table if not exists user_department (user_id integer, dept_id integer)",
    +            "insert into department values (1, 'R&D')",
    +            "insert into department values (2, 'Finance')",
    +            "insert into department values (3, 'HR')",
    +            "insert into department values (4, 'Sales')",
    +            "insert into user_department values (1, 1)",
    +            "insert into user_department values (2, 2)",
    +            "insert into user_department values (3, 3)",
    +            "insert into user_department values (4, 4)"
    +    );
    +    protected UserSpout userSpout;
    +    protected JdbcMapper jdbcMapper;
    +    protected JdbcLookupMapper jdbcLookupMapper;
    +
    +    protected static final String TABLE_NAME = "user";
    +    protected static final String JDBC_CONF = "jdbc.conf";
    +    protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
    +            " and user_department.user_id = ?";
    +
    +    public void execute(String[] args) throws Exception {
    +        if (args.length != 4 && args.length != 5) {
    +            System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
    +                    + "<user> <password> [topology name]");
    +            System.exit(-1);
    +        }
    +        Map map = Maps.newHashMap();
    +        map.put("dataSourceClassName", args[0]);//com.mysql.jdbc.jdbc2.optional.MysqlDataSource
    +        map.put("dataSource.url", args[1]);//jdbc:mysql://localhost/test
    +        map.put("dataSource.user", args[2]);//root
    +        map.put("dataSource.password", args[3]);//password
    +
    +        Config config = new Config();
    +        config.put(JDBC_CONF, map);
    +
    +        JDBCClient jdbcClient = new JDBCClient(map);
    +        for (String sql : setupSqls) {
    +            jdbcClient.executeSql(sql);
    +        }
    +
    +        this.userSpout = new UserSpout();
    +        this.jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, map);
    +        Fields outputFields = new Fields("user_id", "user_name", "dept_name", "create_date");
    +        List<Column> queryParamColumns = Lists.newArrayList(new Column("user_id", Types.INTEGER));
    +        this.jdbcLookupMapper = new SimpleJdbcLookupMapper(outputFields, queryParamColumns);
    +
    +        if (args.length == 4) {
    +            LocalCluster cluster = new LocalCluster();
    +            cluster.submitTopology("test", config, getTopology());
    +            Thread.sleep(30000);
    +            cluster.killTopology("test");
    +            cluster.shutdown();
    +            System.exit(0);
    +        } else {
    +            StormSubmitter.submitTopology(args[5], config, getTopology());
    --- End diff --
    
    yes, fixed. Sorry about this, originally I required that the user passed the table name as argument but I figured it is better to just execute all the DDLs as part of test so I removed it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24038992
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> map, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    --- End diff --
    
    To build on @revans2's comment: We're building preparedStatements each time, but using them only once and discarding them. I'd like to see them cached for re-use (probably with some sort of eviction policy to protect against blowing out memory).
    
    I don't see this as a blocker, but something that should be considered here or as an early improvement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22974973
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,80 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    +                    }
    +                }
    +            }
    +            this.collector.ack(tuple);
    +        } catch (Exception e) {
    +            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
    +            this.collector.fail(tuple);
    --- End diff --
    
    same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22950826
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    --- End diff --
    
    typo 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22961443
  
    --- Diff: external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java ---
    @@ -0,0 +1,102 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.topology;
    +
    +import backtype.storm.Config;
    +import backtype.storm.StormSubmitter;
    +import backtype.storm.generated.StormTopology;
    +import backtype.storm.tuple.Fields;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.common.JDBCClient;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
    +import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
    +import org.apache.storm.jdbc.spout.UserSpout;
    +import backtype.storm.LocalCluster;
    +
    +import java.sql.Types;
    +import java.util.List;
    +import java.util.Map;
    +
    +public abstract class AbstractUserTopology {
    +    private static final List<String> setupSqls = Lists.newArrayList(
    +            "create table if not exists user (user_id integer, user_name varchar(100), dept_name varchar(100), create_date date)",
    +            "create table if not exists department (dept_id integer, dept_name varchar(100))",
    +            "create table if not exists user_department (user_id integer, dept_id integer)",
    +            "insert into department values (1, 'R&D')",
    +            "insert into department values (2, 'Finance')",
    +            "insert into department values (3, 'HR')",
    +            "insert into department values (4, 'Sales')",
    +            "insert into user_department values (1, 1)",
    +            "insert into user_department values (2, 2)",
    +            "insert into user_department values (3, 3)",
    +            "insert into user_department values (4, 4)"
    +    );
    +    protected UserSpout userSpout;
    +    protected JdbcMapper jdbcMapper;
    +    protected JdbcLookupMapper jdbcLookupMapper;
    +
    +    protected static final String TABLE_NAME = "user";
    +    protected static final String JDBC_CONF = "jdbc.conf";
    +    protected static final String SELECT_QUERY = "select dept_name from department, user_department where department.dept_id = user_department.dept_id" +
    +            " and user_department.user_id = ?";
    +
    +    public void execute(String[] args) throws Exception {
    +        if (args.length != 4 && args.length != 5) {
    +            System.out.println("Usage: " + this.getClass().getSimpleName() + " <dataSourceClassName> <dataSource.url> "
    --- End diff --
    
    This is condition will never work, it supposed to be "or"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22978468
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JDBCClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
    +
    +    private HikariDataSource dataSource;
    +
    +    public JDBCClient(Map<String, Object> map) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    +
    +            String query = sb.toString();
    +            if(LOG.isDebugEnabled()) {
    +                LOG.debug("Executing query " + query);
    +            }
    +
    +            PreparedStatement preparedStatement = connection.prepareStatement(query);
    +            for(List<Column> columnList : columnLists) {
    +                setPreparedStatementParams(preparedStatement, columnList);
    +            }
    +
    +            return preparedStatement.executeUpdate();
    --- End diff --
    
    added a new config queryTimeOutSecs which is defaulted to 30 seconds which is equal to topology.message.timeout.secs. user can override during bolt construction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24037306
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt withConfigKey(String configKey) {
    --- End diff --
    
    I think the idea behind config key here, like in other modules, is to allow multiple instances of the bolt use different configurations (e.g. pointing to different databases, schemas, etc.).
    
    That being said, I'm not really partial to using either a builder method or constructor arg, as long as it's exposed as an option. Either way, I'd like to try to keep it consistent across modules so the APIs have a similar "feel". One option for the builder method would be to define a default config key value that gets used unless overridden with `withConfigKey()`. We should also clearly document this behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-105598508
  
    @Crystark  You are right I will fix it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24040287
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +
    +import java.io.Serializable;
    +import java.lang.reflect.Field;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +
    +public class Column<T> implements Serializable {
    +
    +    private String columnName;
    +    private T val;
    +    private int sqlType;
    +
    +    public Column(String columnName, T val, int sqlType) {
    --- End diff --
    
    I have added a javadoc that should help to clarify things. If we consider a database table as a list of rows and each row as a list of columns, this class represent a single instance of a column, so for a row [userID =1 , userName = "Foo"] we will have two instances of Column class
    new Column("userId", 1, Types.INTEGER) and new Column("userName", "Foo", Types.Varchar). 
    
    I am not sure why java's jdbc API decided to go with integer to represent sql Types instead of using more readable Enums but the sqlType has value mapped to java.sql.Types where each constant represents a database type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24014635
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt withConfigKey(String configKey) {
    --- End diff --
    
    If this is required I would rather see it be a part of a Constructor rather than a builder method.  This should be similar for the others.  Builder methods are great for optional values, that have a decent default, but if something is required it should be in the constructor.  And why do we want a config key?  The JDBC config needs to be serializable to both json and yaml to be in the storm config already, it feels like we could just pass the JDBC config into the bolt instead, unless you feel it really needs to be in the config for debug purposes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24015372
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +
    +import java.io.Serializable;
    +import java.lang.reflect.Field;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    --- End diff --
    
    Most, if not all of these imports do not seem to be used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-75847524
  
    I am now getting a compilation error.
    ``` [ERROR] external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Util.java:[30,29] incomparable types: int and java.lang.Object```  Not sure if this is JDK8 specific or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-73919089
  
    +1 Tested the code with mysql.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24039240
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    --- End diff --
    
    Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22975021
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JDBCClient.java ---
    @@ -0,0 +1,209 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JDBCClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JDBCClient.class);
    +
    +    private HikariDataSource dataSource;
    +
    +    public JDBCClient(Map<String, Object> map) {
    +        Properties properties = new Properties();
    +        properties.putAll(map);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    +
    +            String query = sb.toString();
    +            if(LOG.isDebugEnabled()) {
    +                LOG.debug("Executing query " + query);
    +            }
    +
    +            PreparedStatement preparedStatement = connection.prepareStatement(query);
    +            for(List<Column> columnList : columnLists) {
    +                setPreparedStatementParams(preparedStatement, columnList);
    +            }
    +
    +            return preparedStatement.executeUpdate();
    --- End diff --
    
    executeUpdate could block for a very long time since preparedState.setQueryTimeout() was not called. This should be preferably configurable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Crystark <gi...@git.apache.org>.
Github user Crystark commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24902378
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/JdbcClient.java ---
    @@ -0,0 +1,213 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +import com.google.common.base.Function;
    +import com.google.common.base.Joiner;
    +import com.google.common.collect.Collections2;
    +import com.google.common.collect.Lists;
    +import com.zaxxer.hikari.HikariConfig;
    +import com.zaxxer.hikari.HikariDataSource;
    +import org.apache.commons.lang.StringUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.sql.*;
    +import java.sql.Date;
    +import java.util.*;
    +
    +public class JdbcClient {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcClient.class);
    +
    +    private HikariDataSource dataSource;
    +    private int queryTimeoutSecs;
    +
    +    public JdbcClient(Map<String, Object> hikariConfigMap, int queryTimeoutSecs) {
    +        Properties properties = new Properties();
    +        properties.putAll(hikariConfigMap);
    +        HikariConfig config = new HikariConfig(properties);
    +        this.dataSource = new HikariDataSource(config);
    +        this.queryTimeoutSecs = queryTimeoutSecs;
    +    }
    +
    +    public int insert(String tableName, List<List<Column>> columnLists) {
    +        Connection connection = null;
    +        try {
    +            connection = this.dataSource.getConnection();
    +            StringBuilder sb = new StringBuilder();
    +            sb.append("Insert into ").append(tableName).append(" (");
    +            Collection<String> columnNames = Collections2.transform(columnLists.get(0), new Function<Column, String>() {
    +                @Override
    +                public String apply(Column input) {
    +                    return input.getColumnName();
    +                }
    +            });
    +            String columns = Joiner.on(",").join(columnNames);
    +            sb.append(columns).append(") values ( ");
    +
    +            String placeHolders = StringUtils.chop(StringUtils.repeat("?,", columnNames.size()));
    +            sb.append(placeHolders).append(")");
    +
    +            String query = sb.toString();
    +
    +            LOG.debug("Executing query {}", query);
    +
    +
    +            PreparedStatement preparedStatement = connection.prepareStatement(query);
    +            preparedStatement.setQueryTimeout(queryTimeoutSecs);
    +            for(List<Column> columnList : columnLists) {
    +                setPreparedStatementParams(preparedStatement, columnList);
    +            }
    --- End diff --
    
    Here each `columnList ` of `columnLists` overrides the previous value.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22978580
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    --- End diff --
    
    fixed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24037432
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,81 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    --- End diff --
    
    I agree with @revans2. While this deviates from some of the other older modules, I think this is a good convention to follow moving forward.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24015436
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java ---
    @@ -0,0 +1,87 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.common;
    +
    +
    +import java.io.Serializable;
    +import java.lang.reflect.Field;
    +import java.sql.Date;
    +import java.sql.Time;
    +import java.sql.Timestamp;
    +import java.sql.Types;
    +
    +public class Column<T> implements Serializable {
    +
    +    private String columnName;
    +    private T val;
    +    private int sqlType;
    +
    +    public Column(String columnName, T val, int sqlType) {
    --- End diff --
    
    What really is val?  and what values of sqlType mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973557
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcLookupBolt.java ---
    @@ -0,0 +1,80 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import backtype.storm.tuple.Values;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.List;
    +
    +/**
    + * Basic bolt for querying from any database.
    + */
    +public class JdbcLookupBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupBolt.class);
    +
    +    private String selectQuery;
    +
    +    private JdbcLookupMapper jdbcLookupMapper;
    +
    +    public JdbcLookupBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcLookupBolt withJdbcLookupMapper(JdbcLookupMapper jdbcLookupMapper) {
    +        this.jdbcLookupMapper = jdbcLookupMapper;
    +        return this;
    +    }
    +
    +    public JdbcLookupBolt withSelectSql(String selectQuery) {
    +        this.selectQuery = selectQuery;
    +        return this;
    +    }
    +
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcLookupMapper.getColumns(tuple);
    +            List<List<Column>> result = jdbcClient.select(this.selectQuery, columns);
    +
    +            if (result != null && result.size() != 0) {
    +                for (List<Column> row : result) {
    +                    List<Values> values = jdbcLookupMapper.toTuple(tuple, row);
    +                    for (Values value : values) {
    +                        collector.emit(value);
    +                    }
    +                }
    +            }
    +            this.collector.ack(tuple);
    +        } catch (Exception e) {
    +            LOG.info("Failed to execute a select query {} on tuple {} ", this.selectQuery, tuple);
    --- End diff --
    
    We usually do not log each exception. we let the reportError decide what to do with it.  So to use this module we would have to turn off this logging completely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by revans2 <gi...@git.apache.org>.
Github user revans2 commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-72685012
  
    For the most part it looks OK.  I am concerned about a few of the APIs seem confusing to me and could use some better javadocs.  Also I am concerned with the performance of some of the sql involved.  We are regenerating a lot of the sql from text for each query.  I am not a sql expert, but I thought that the idea behind prepared statements was that it made it so the sql engine did not have to reparse and compile the query each time.  I'm not sure how each server does that internally, but the fact that we are doing string manipulation for each query instead of once when the bolt is prepared, seems problematic.
    
    Also the Columns magic we are doing seems OK, but like I said I am not a SQL expert and it feels like just using a stored query would be a lot simpler then all of this. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Parth-Brahmbhatt <gi...@git.apache.org>.
Github user Parth-Brahmbhatt commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22962071
  
    --- Diff: external/storm-jdbc/README.md ---
    @@ -0,0 +1,208 @@
    +#Storm JDBC
    +Storm/Trident integration for JDBC. This package includes the core bolts and trident states that allows a storm topology
    +to either insert storm tuples in a database table or to execute select queries against a database and enrich tuples 
    +in a storm topology. This code uses HikariCP for connection pooling. See http://brettwooldridge.github.io/HikariCP.
    +
    +## Inserting into a database.
    +The bolt and trindet state included in this package for inserting data into a database tables are tied to a single table.
    --- End diff --
    
    fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22973307
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    this.collector.reportError(e) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by Crystark <gi...@git.apache.org>.
Github user Crystark commented on the pull request:

    https://github.com/apache/storm/pull/374#issuecomment-105472684
  
    On more thing i noticed: [Column.hashCode](https://github.com/apache/storm/blob/master/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java#L99) doesn't support the value beeing null. I've tried using those columns as keys of a HashMap and got a NPE.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by itaifrenkel <gi...@git.apache.org>.
Github user itaifrenkel commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r22975439
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    +            this.collector.fail(tuple);
    --- End diff --
    
    I am not qualified for advising best practices in Storm codebase. Our experience has shown that in our environment a database that is not responding for any reason would flood the logs with long stack traces causing far worse side effects. It could happen to others. To reuse this code I would probably need to copy-paste the entire bolt, which is ok, since most of the logic resides elsewhere.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: Storm-616 : Storm-jdbc connector.

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/374#discussion_r24036567
  
    --- Diff: external/storm-jdbc/src/main/java/org/apache/storm/jdbc/bolt/JdbcBolt.java ---
    @@ -0,0 +1,91 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.storm.jdbc.bolt;
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +import backtype.storm.topology.OutputFieldsDeclarer;
    +import backtype.storm.tuple.Tuple;
    +import org.apache.storm.jdbc.common.Column;
    +import org.apache.storm.jdbc.mapper.JdbcMapper;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +/**
    + * Basic bolt for writing to any Database table.
    + * <p/>
    + * Note: Each JdbcBolt defined in a topology is tied to a specific table.
    + */
    +public class JdbcBolt extends AbstractJdbcBolt {
    +    private static final Logger LOG = LoggerFactory.getLogger(JdbcBolt.class);
    +
    +    private String tableName;
    +    private JdbcMapper jdbcMapper;
    +
    +    public JdbcBolt(String configKey) {
    +        super(configKey);
    +    }
    +
    +    public JdbcBolt withTableName(String tableName) {
    +        this.tableName = tableName;
    +        return this;
    +    }
    +
    +    public JdbcBolt withJdbcMapper(JdbcMapper jdbcMapper) {
    +        this.jdbcMapper = jdbcMapper;
    +        return this;
    +    }
    +
    +    @Override
    +    public void execute(Tuple tuple) {
    +        try {
    +            List<Column> columns = jdbcMapper.getColumns(tuple);
    +            List<List<Column>> columnLists = new ArrayList<List<Column>>();
    +            columnLists.add(columns);
    +            this.jdbcClient.insert(this.tableName, columnLists);
    +        } catch (Exception e) {
    +            LOG.warn("Failing tuple.", e);
    --- End diff --
    
    I agree. `reportError()` has built-in protections against being flooded (and overloading ZK as a result).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---