You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by sujitha chinnu <ch...@gmail.com> on 2016/03/01 12:42:20 UTC

Passing array values to PostgreSQL using storm bolts.

 Hai.,
          I want to insert Array values of type long into the PostgreSQL
using storm bolts and from the schema class(Spout) i am returning the array
values.Can anyone help me in rectifying my error.

*My simplejdbcMapper class:*


 public class SimpleJdbcMapper implements JdbcMapper {

    private List<Column> schemaColumns;

    public SimpleJdbcMapper(String tableName, ConnectionProvider
connectionProvider) {
        connectionProvider.prepare();
        JdbcClient client = new JdbcClient(connectionProvider);
        this.schemaColumns = client.getColumnSchema(tableName);
    }

    public SimpleJdbcMapper(List<Column> schemaColumns) {
        this.schemaColumns = schemaColumns;
    }

    @Override
    public List<Column> getColumns(ITuple tuple) {
        List<Column> columns = new ArrayList<Column>();
        for(Column column : schemaColumns) {
            String columnName = column.getColumnName();
            Integer columnSqlType = column.getSqlType();

            if(Util.getJavaType(columnSqlType).equals(String.class)) {
                String value = tuple.getStringByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Short.class)) {
                Short value = tuple.getShortByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else
if(Util.getJavaType(columnSqlType).equals(Integer.class)) {
                Integer value = tuple.getIntegerByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Long.class)) {
                Long value = tuple.getLongByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Double.class))
{
                Double value = tuple.getDoubleByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Float.class)) {
                Float value = tuple.getFloatByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else
if(Util.getJavaType(columnSqlType).equals(Boolean.class)) {
                Boolean value = tuple.getBooleanByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(byte[].class))
{
                byte[] value = tuple.getBinaryByField(columnName);
                columns.add(new Column(columnName, value, columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Date.class)) {
                Long value = tuple.getLongByField(columnName);
                columns.add(new Column(columnName, new Date(value),
columnSqlType));
            } else if(Util.getJavaType(columnSqlType).equals(Time.class)) {
                Long value = tuple.getLongByField(columnName);
                columns.add(new Column(columnName, new Time(value),
columnSqlType));
            } else
if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) {
                Long value = tuple.getLongByField(columnName);
                columns.add(new Column(columnName, new Timestamp(value),
columnSqlType));
            } else {
                throw new RuntimeException("Unsupported java type in tuple
" + Util.getJavaType(columnSqlType));
            }
        }
        return columns;
    }
}



*Bolt class:*


List<Column> schemaColumns = Lists.newArrayList(new Column("xxx",
Types.BIGINT),new Column("yyy",Types.VARCHAR),
new Column("zzz",Types.VARCHAR));

JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

String insertQuery = "insert intotable_data (xxx, yyy,  zzz) values
(regexp_split_to_table(?,','),?,?)";

@Override
public void execute(Tuple tuple) {
try {

List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists); } else {
this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
this.collector.ack(tuple);
} catch (Exception e) { this.collector.reportError(e);
this.collector.fail(tuple); }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}



*JdbcClient class:*


public void executeInsertQuery(String query, List<List<Column>>
columnLists)  {
        Connection connection = null;

        try {
            connection = connectionProvider.getConnection();
            boolean autoCommit = connection.getAutoCommit();
            if(autoCommit) {
                connection.setAutoCommit(false);
            }
            LOG.debug(String.format("Executing query {}", query));

            PreparedStatement preparedStatement =
connection.prepareStatement(query);

            for(List<Column> columnList : columnLists) {

                setPreparedStatementParams(preparedStatement, columnList);
                preparedStatement.addBatch();
            }

            int[] results = preparedStatement.executeBatch();
            if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
                connection.rollback();
                //throw new RuntimeException("failed at least one sql
statement in the batch, operation rolled back.");
       LOG.error("failed at least one sql statement in the batch, operation
rolled back.");
            } else {
                try {
                    connection.commit();
                } catch (SQLException e) {
                    //throw new RuntimeException("Failed to commit insert
query " + query, e);

   LOG.error("Failed to commit insert query " + query, e);
                }
            }
        } catch (SQLException e) {
   throw new RuntimeException("Failed to execute insert query " + query,
e.getNextException());

        } finally {
            closeConnection(connection);
        }
    }



    public List<Column> getColumnSchema(String tableName) {
        Connection connection = null;
        List<Column> columns = new ArrayList<Column>();
        try {
            connection = connectionProvider.getConnection();
            DatabaseMetaData metaData = connection.getMetaData();
            ResultSet resultSet = metaData.getColumns(null, null,
tableName, null);
            while (resultSet.next()) {
                columns.add(new Column(resultSet.getString("COLUMN_NAME"),
resultSet.getInt("DATA_TYPE")));
            }
            return columns;
        } catch (SQLException e) {
            throw new RuntimeException("Failed to get schema for table " +
tableName, e);
        } finally {
            closeConnection(connection);
        }
    }


    private void setPreparedStatementParams(PreparedStatement
preparedStatement, List<Column> columnList) throws SQLException {
        int index = 1;
        for (Column column : columnList) {
            Class columnJavaType = Util.getJavaType(column.getSqlType());
            if (column.getVal() == null) {
                preparedStatement.setNull(index, column.getSqlType());
            } else if (columnJavaType.equals(String.class)) {
                preparedStatement.setString(index, (String)
column.getVal());
            } else if (columnJavaType.equals(Integer.class)) {
                preparedStatement.setInt(index, (Integer) column.getVal());
            } else if (columnJavaType.equals(Double.class)) {
                preparedStatement.setDouble(index, (Double)
column.getVal());
            } else if (columnJavaType.equals(Float.class)) {
                preparedStatement.setFloat(index, (Float) column.getVal());
            } else if (columnJavaType.equals(Short.class)) {
                preparedStatement.setShort(index, (Short) column.getVal());
            } else if (columnJavaType.equals(Boolean.class)) {
                preparedStatement.setBoolean(index, (Boolean)
column.getVal());
            } else if (columnJavaType.equals(byte[].class)) {
                preparedStatement.setBytes(index, (byte[]) column.getVal());
            } else if (columnJavaType.equals(Long.class)) {
                preparedStatement.setLong(index, (Long) column.getVal());
            } else if (columnJavaType.equals(Date.class)) {
                preparedStatement.setDate(index, (Date) column.getVal());
            } else if (columnJavaType.equals(Time.class)) {
                preparedStatement.setTime(index, (Time) column.getVal());
            } else if (columnJavaType.equals(Timestamp.class)) {
                preparedStatement.setTimestamp(index, (Timestamp)
column.getVal());
            } else {
                throw new RuntimeException("Unknown type of value " +
column.getVal() + " for column " + column.getColumnName());
            }
            ++index;
        }
    }

    private void closeConnection(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException e) {
                throw new RuntimeException("Failed to close connection", e);
            }
        }
    }



*ERROR:*


*java.lang.ClassCastException: java.util.ArrayList cannot be cast to
java.lang.Long at
backtype.storm.tuple.TupleImpl.getLongByField(TupleImpl.java:161) at
com.aail.dao.SimpleJdbcMapper.getColumns(SimpleJdbcMapper.java:49) at
com.aail.storm.bolts.Fb_bolts.execute(bolts.java:225) at
backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
at
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) *