You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by sujitha chinnu <ch...@gmail.com> on 2016/03/01 12:42:20 UTC
Passing array values to PostgreSQL using storm bolts.
Hai.,
I want to insert Array values of type long into the PostgreSQL
using storm bolts and from the schema class(Spout) i am returning the array
values.Can anyone help me in rectifying my error.
*My simplejdbcMapper class:*
public class SimpleJdbcMapper implements JdbcMapper {
private List<Column> schemaColumns;
public SimpleJdbcMapper(String tableName, ConnectionProvider
connectionProvider) {
connectionProvider.prepare();
JdbcClient client = new JdbcClient(connectionProvider);
this.schemaColumns = client.getColumnSchema(tableName);
}
public SimpleJdbcMapper(List<Column> schemaColumns) {
this.schemaColumns = schemaColumns;
}
@Override
public List<Column> getColumns(ITuple tuple) {
List<Column> columns = new ArrayList<Column>();
for(Column column : schemaColumns) {
String columnName = column.getColumnName();
Integer columnSqlType = column.getSqlType();
if(Util.getJavaType(columnSqlType).equals(String.class)) {
String value = tuple.getStringByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Short.class)) {
Short value = tuple.getShortByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else
if(Util.getJavaType(columnSqlType).equals(Integer.class)) {
Integer value = tuple.getIntegerByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Long.class)) {
Long value = tuple.getLongByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Double.class))
{
Double value = tuple.getDoubleByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Float.class)) {
Float value = tuple.getFloatByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else
if(Util.getJavaType(columnSqlType).equals(Boolean.class)) {
Boolean value = tuple.getBooleanByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(byte[].class))
{
byte[] value = tuple.getBinaryByField(columnName);
columns.add(new Column(columnName, value, columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Date.class)) {
Long value = tuple.getLongByField(columnName);
columns.add(new Column(columnName, new Date(value),
columnSqlType));
} else if(Util.getJavaType(columnSqlType).equals(Time.class)) {
Long value = tuple.getLongByField(columnName);
columns.add(new Column(columnName, new Time(value),
columnSqlType));
} else
if(Util.getJavaType(columnSqlType).equals(Timestamp.class)) {
Long value = tuple.getLongByField(columnName);
columns.add(new Column(columnName, new Timestamp(value),
columnSqlType));
} else {
throw new RuntimeException("Unsupported java type in tuple
" + Util.getJavaType(columnSqlType));
}
}
return columns;
}
}
*Bolt class:*
List<Column> schemaColumns = Lists.newArrayList(new Column("xxx",
Types.BIGINT),new Column("yyy",Types.VARCHAR),
new Column("zzz",Types.VARCHAR));
JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
String insertQuery = "insert intotable_data (xxx, yyy, zzz) values
(regexp_split_to_table(?,','),?,?)";
@Override
public void execute(Tuple tuple) {
try {
List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists); } else {
this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists); }
this.collector.ack(tuple);
} catch (Exception e) { this.collector.reportError(e);
this.collector.fail(tuple); }
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}}
*JdbcClient class:*
public void executeInsertQuery(String query, List<List<Column>>
columnLists) {
Connection connection = null;
try {
connection = connectionProvider.getConnection();
boolean autoCommit = connection.getAutoCommit();
if(autoCommit) {
connection.setAutoCommit(false);
}
LOG.debug(String.format("Executing query {}", query));
PreparedStatement preparedStatement =
connection.prepareStatement(query);
for(List<Column> columnList : columnLists) {
setPreparedStatementParams(preparedStatement, columnList);
preparedStatement.addBatch();
}
int[] results = preparedStatement.executeBatch();
if(Arrays.asList(results).contains(Statement.EXECUTE_FAILED)) {
connection.rollback();
//throw new RuntimeException("failed at least one sql
statement in the batch, operation rolled back.");
LOG.error("failed at least one sql statement in the batch, operation
rolled back.");
} else {
try {
connection.commit();
} catch (SQLException e) {
//throw new RuntimeException("Failed to commit insert
query " + query, e);
LOG.error("Failed to commit insert query " + query, e);
}
}
} catch (SQLException e) {
throw new RuntimeException("Failed to execute insert query " + query,
e.getNextException());
} finally {
closeConnection(connection);
}
}
public List<Column> getColumnSchema(String tableName) {
Connection connection = null;
List<Column> columns = new ArrayList<Column>();
try {
connection = connectionProvider.getConnection();
DatabaseMetaData metaData = connection.getMetaData();
ResultSet resultSet = metaData.getColumns(null, null,
tableName, null);
while (resultSet.next()) {
columns.add(new Column(resultSet.getString("COLUMN_NAME"),
resultSet.getInt("DATA_TYPE")));
}
return columns;
} catch (SQLException e) {
throw new RuntimeException("Failed to get schema for table " +
tableName, e);
} finally {
closeConnection(connection);
}
}
private void setPreparedStatementParams(PreparedStatement
preparedStatement, List<Column> columnList) throws SQLException {
int index = 1;
for (Column column : columnList) {
Class columnJavaType = Util.getJavaType(column.getSqlType());
if (column.getVal() == null) {
preparedStatement.setNull(index, column.getSqlType());
} else if (columnJavaType.equals(String.class)) {
preparedStatement.setString(index, (String)
column.getVal());
} else if (columnJavaType.equals(Integer.class)) {
preparedStatement.setInt(index, (Integer) column.getVal());
} else if (columnJavaType.equals(Double.class)) {
preparedStatement.setDouble(index, (Double)
column.getVal());
} else if (columnJavaType.equals(Float.class)) {
preparedStatement.setFloat(index, (Float) column.getVal());
} else if (columnJavaType.equals(Short.class)) {
preparedStatement.setShort(index, (Short) column.getVal());
} else if (columnJavaType.equals(Boolean.class)) {
preparedStatement.setBoolean(index, (Boolean)
column.getVal());
} else if (columnJavaType.equals(byte[].class)) {
preparedStatement.setBytes(index, (byte[]) column.getVal());
} else if (columnJavaType.equals(Long.class)) {
preparedStatement.setLong(index, (Long) column.getVal());
} else if (columnJavaType.equals(Date.class)) {
preparedStatement.setDate(index, (Date) column.getVal());
} else if (columnJavaType.equals(Time.class)) {
preparedStatement.setTime(index, (Time) column.getVal());
} else if (columnJavaType.equals(Timestamp.class)) {
preparedStatement.setTimestamp(index, (Timestamp)
column.getVal());
} else {
throw new RuntimeException("Unknown type of value " +
column.getVal() + " for column " + column.getColumnName());
}
++index;
}
}
private void closeConnection(Connection connection) {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
throw new RuntimeException("Failed to close connection", e);
}
}
}
*ERROR:*
*java.lang.ClassCastException: java.util.ArrayList cannot be cast to
java.lang.Long at
backtype.storm.tuple.TupleImpl.getLongByField(TupleImpl.java:161) at
com.aail.dao.SimpleJdbcMapper.getColumns(SimpleJdbcMapper.java:49) at
com.aail.storm.bolts.Fb_bolts.execute(bolts.java:225) at
backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
at
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) *