You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Alec Lee <al...@gmail.com> on 2015/08/28 02:29:06 UTC
trident spout to read data from database
Hi, all
I try to implement a trident Spout to read data from DB,
@SuppressWarnings("serial")
public static class PostgresDBSpout implements IBatchSpout {
private static final long serialVersionUID = 1L;
SpoutOutputCollector _collector;
boolean _isDistributed;
public PostgresDBSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
public boolean isDistributed() {
return true;
}
@Override
public void emitBatch(final long batchId, final TridentCollector collector) {
_collector = collector ;
Connection conn = null ;
LinkedList listOfMeasurement = new LinkedList() ;
conn = connectToDatabaseOrDie();
fetchMeasurementSample(conn, listOfMeasurement);
Iterator it = listOfMeasurement.iterator();
while (it.hasNext())
{
Measurement measurement = (Measurement)it.next();
System.out.println("id: " + measurement.id);
_collector.emit(new Values(measurement.sensor_id, measurement.period, measurement.powermon_num, measurement.current,
measurement.id, measurement.measurement_timestamp, measurement.measurement_dateuploaded));
}
}
@Override
@SuppressWarnings("rawtypes")
public void open(final Map conf, final TopologyContext context) {
}
private void fetchMeasurementSample(Connection conn, LinkedList listOfMeasurement)
{
try
{
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery("SELECT * FROM stream_producer.measurements_sample ORDER BY date_uploaded asc");
while ( rs.next() )
{
Measurement measurement = new Measurement();
measurement.sensor_id = rs.getString("sensor_id");
measurement.period = rs.getInt("period");
measurement.powermon_num = rs.getLong("powermon_num") ;
measurement.current = Float.parseFloat(rs.getString("current")) ;
measurement.id = rs.getString("id") ;
measurement.measurement_timestamp = rs.getString("measurement_timestamp") ;
measurement.measurement_dateuploaded = rs.getString("measurement_dateuploaded") ;
listOfMeasurement.add(measurement);
}
rs.close();
st.close();
}
catch (SQLException se) {
logger.error("Threw a SQLException creating the list of blogs.");
logger.error(se.getMessage());
}
}
private Connection connectToDatabaseOrDie() {
Connection conn = null;
try {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://10.43.34.144:5432/stream_producer";
conn = DriverManager.getConnection(url, "stream", "stream");
logger.info("DB connected .....");
} catch (ClassNotFoundException e) {
logger.error("Failed to establish DB connection - 1", e);
System.exit(1);
} catch (SQLException e) {
logger.error("Failed to establish DB connection - 2", e);
System.exit(2);
}
return conn;
}
@Override
public void close() {
}
@Override
public void ack(final long batchId) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sensor_id", "period", "powermon_num", "current", "id", "measurement_timestamp", "measurement_dateuploaded"));
}
public void activate() {
// TODO Auto-generated method stub
}
public void deactivate() {
// TODO Auto-generated method stub
}
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
Then I can open a trident topology like this
topology.newStream("spoutInit", new PostgresDBSpout())
.parallelismHint(6)
.groupBy(new Fields("sensor_id"))
.each(new Fields("sensor_id","measurement_timestamp"),
new PrintStream(),
new Fields("sensorid","timestamps")) ;
Just can’t make it work, anyone can tell me what is wrong with this code?
thanks
AL