You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Alec Lee <al...@gmail.com> on 2015/08/28 02:29:06 UTC

trident spout to read data from database

Hi, all
 

I try to implement a trident Spout to read data from DB, 


             @SuppressWarnings("serial")
             public static class PostgresDBSpout implements IBatchSpout {

                              private static final long serialVersionUID = 1L;
                              SpoutOutputCollector _collector;
                              boolean _isDistributed;

                              public PostgresDBSpout(boolean isDistributed) {
                                     _isDistributed = isDistributed;
                              }

                              public boolean isDistributed() {
                                     return true;
                              }

                              @Override
                              public void emitBatch(final long batchId, final TridentCollector collector) {
                                           _collector = collector ;
                                           Connection conn = null ;
                                           LinkedList listOfMeasurement = new LinkedList() ;
                                           conn  = connectToDatabaseOrDie();
                                           fetchMeasurementSample(conn, listOfMeasurement);

                                           Iterator it = listOfMeasurement.iterator();
                                           while (it.hasNext())
                                           {
                                                 Measurement measurement = (Measurement)it.next();
                                                 System.out.println("id: " + measurement.id);
                                                 _collector.emit(new Values(measurement.sensor_id, measurement.period, measurement.powermon_num, measurement.current,
                                                                            measurement.id, measurement.measurement_timestamp, measurement.measurement_dateuploaded));
                                            }
                              }

                              @Override
                              @SuppressWarnings("rawtypes")
                              public void open(final Map conf, final TopologyContext context) {
                              }

                              private void fetchMeasurementSample(Connection conn, LinkedList listOfMeasurement)
                              {
                                       try
                                       {
                                             Statement st = conn.createStatement();
                                             ResultSet rs = st.executeQuery("SELECT * FROM stream_producer.measurements_sample ORDER BY date_uploaded asc");
                                             while ( rs.next() )
                                             {
                                                   Measurement measurement = new Measurement();
                                                   measurement.sensor_id                   =  rs.getString("sensor_id");
                                                   measurement.period                      =  rs.getInt("period");
                                                   measurement.powermon_num                =  rs.getLong("powermon_num") ;
                                                   measurement.current                     =  Float.parseFloat(rs.getString("current")) ;
                                                   measurement.id                          =  rs.getString("id") ;
                                                   measurement.measurement_timestamp       =  rs.getString("measurement_timestamp") ;
                                                   measurement.measurement_dateuploaded    =  rs.getString("measurement_dateuploaded") ;
                                                   listOfMeasurement.add(measurement);
                                             }
                                             rs.close();
                                             st.close();
                                       }
                                       catch (SQLException se) {
                                                   logger.error("Threw a SQLException creating the list of blogs.");
                                                   logger.error(se.getMessage());
                                       }
                              }

                              private Connection connectToDatabaseOrDie() {
                                       Connection conn = null;
                                       try {
                                                Class.forName("org.postgresql.Driver");
                                                String url = "jdbc:postgresql://10.43.34.144:5432/stream_producer";
                                                conn = DriverManager.getConnection(url, "stream", "stream");
                                                logger.info("DB connected .....");
                                        } catch (ClassNotFoundException e) {
                                                logger.error("Failed to establish DB connection - 1", e);
                                                System.exit(1);
                                        } catch (SQLException e) {
                                                logger.error("Failed to establish DB connection - 2", e);
                                                System.exit(2);
                                        }
                                        return conn;
                              }

                              @Override
                              public void close() {
                              }
                              @Override
                              public void ack(final long batchId) {
                              }

                              @Override
                              public void fail(Object id) {
                              }  

                              @Override
                              public void declareOutputFields(OutputFieldsDeclarer declarer) {
                                                declarer.declare(new Fields("sensor_id", "period", "powermon_num", "current", "id", "measurement_timestamp", "measurement_dateuploaded"));
                              }


                              public void activate() {
                                // TODO Auto-generated method stub
                              }

                              public void deactivate() {
                                // TODO Auto-generated method stub
                              }

                              public Map<String, Object> getComponentConfiguration() {
                                // TODO Auto-generated method stub
                                        return null;
                              }
             }

Then I can open a trident topology like this 
 topology.newStream("spoutInit", new PostgresDBSpout())
                                           .parallelismHint(6)
                                           .groupBy(new Fields("sensor_id"))
                                           .each(new Fields("sensor_id","measurement_timestamp"),
                                                 new PrintStream(),
                                                 new Fields("sensorid","timestamps")) ;
 

Just can’t make it work, anyone can tell me what is wrong with this code?

thanks

AL