You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/06/27 06:07:44 UTC

postgresql -> spout

Dear all

I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.


thanks

Alec



Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
Thank again, Robert, that is awesome, good way to let me start. Will try now. 

Alec


On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com> wrote:

> I always like to simplify things. If I were you, I would use the well known and used spout of kafka to ingest data into your storm cluster. Simply write a Kafka Producer that utilizes the postgre java driver to pull out your required data and send it as a message. You'll find it is pretty easy to write kafka producers. Check out my project of creating some simple producers and just mirror that to produce your postgre producer:
> 
> https://github.com/leerobert/kafka-producers 
> 
> 
> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:
> Thanks a lot, John. The entire project is getting data from postgresql and finally emit and update cassandra tables. With the help of Robert in this group, think I have some resource of storm-cassandra integration. However,  really not much tutorials regarding postgres with storm, 'storm-rdbms ‘ is the only examples I can find about db->storm. That would be great if someone can contribute more example code about posture-storm. Sorry for the shameless requirement from a new storm user.
> 
> 
> thanka
> 
> Alec
> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
> 
>> Hi 
>> 
>> We use Postgres notifications. The spout open method  registers for database notifications (add, update, delete). Each time the spout next method is called we check for pending notifications and process accordingly.
>> 
>> Good Luck
>> 
>> John 
>> 
>> 
>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>> Dear all
>> 
>> I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.
>> 
>> 
>> thanks
>> 
>> Alec
>> 
>> 
>> 
> 
> 


Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
Thanks for reply, Marc, getting data from postgres is a temp solution, in
the future, kafka won't listen to postgres but others, so I need to get
entire loop work through. Your suggestion is absolutely useful, since I
need to create a bitmap in bolt and then write into postgres regularly,
that I can directly connect to postgres.

Alec
On Jul 17, 2014 2:33 PM, "Marc Vaillant" <va...@animetrics.com> wrote:

> I know that this is very late to jump in but if you are integrating into
> a system that is already using postgres as a message broker/queue, why
> not poll your postgres queue directly from nexttuple instead of pushing
> everything onto another queue?  Just use jdbc to connect to postgres
> from nexttuple.
>
> Best,
> Marc
>
> On Thu, Jul 17, 2014 at 02:18:33PM -0700, Sa Li wrote:
> > Hello, Robert
> > This is the followup message from last thread, just back this topic from
> other
> > work, as you suggested, I have written the topology to run, basically I
> will
> > get the json object from kafka producer, the object is like this:
> >
> > "{"messagetype":"PageView","time":1402437708,"totaltime":9,"pagename
> >  :"user.aspx","profileid":69781139,"userid :76888177}"
> >
> > And I want to be able to retrieve data fields of time and userid for
> future
> > use, here the code I wrote
> >
> > public class ingestTopology {
> >
> >
> > protected ingestTopology() {
> >         throw new UnsupportedOperationException();
> >     }
> >
> >     public static class JsonObjectParse extends BaseFunction {
> >         @Override
> >         public final void execute(
> >             final TridentTuple tuple,
> >             final TridentCollector collector
> >         ) {
> >             byte[] bytes = tuple.getBinary(0);
> >             try {
> >                 String decoded = new String(bytes);
> >                 JSONObject json = new JSONObject(decoded);
> >                 collector.emit(new Values(
> >                       json.getString("time")
> >                     , json.getString("userid")
> >                 ));
> >             } catch (JSONException e) {
> >                 System.err.println("Caught JSONException: " +
> e.getMessage());
> >             }
> >         }
> >     }
> >
> >
> >
> >     public static StormTopology buildTopology() {
> >         try {
> >         TridentTopology topology = new TridentTopology();
> >     BrokerHosts zk = new ZkHosts("localhost");
> >     TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "ingest_test");
> >     spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> >     OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
> >
> >
> >
> >             topology.newStream("spout1", spout)
> >                 .each(new Fields("str"), new JsonObjectParse(),
> >                     new Fields(
> >                         "time",
> >                         "userid"
> >                 ));
> >
> >             return topology.build();
> >         } catch (IllegalArgumentException e) {
> >             System.err.println("Caught IOException: " + e.getMessage());
> >         }
> >         return null;
> >     }
> >
> > public static void main(String[] args) throws Exception {
> >
> >
> >
> >     Config conf = new Config();
> >     conf.setDebug(true);
> >
> >
> >
> >         if (args != null && args.length > 0) {
> >             conf.setNumWorkers(3);
> >
> >             StormSubmitter.submitTopology(args[0], conf,
> buildTopology());
> >           }
> >           else {
> >           conf.setMaxSpoutPending(1);
> >              conf.setMaxTaskParallelism(3);
> >           LocalCluster cluster = new LocalCluster();
> >             cluster.submitTopology("kafka", conf, buildTopology());
> >
> >
> >
> >             Thread.sleep(100);
> >               cluster.shutdown();
> >           }
> >
> >
> >
> > }
> > }
> >
> > However I got tons of error,
> > SLF4J: Class path contains multiple SLF4J bindings.
> > SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> > slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar!/org/slf4j/impl/
> > StaticLoggerBinder.class]
> > SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> > slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/
> > StaticLoggerBinder.class]
> > SLF4J: Found binding in
> [jar:file:/Users/sali/.m2/repository/ch/qos/logback/
> > logback-classic/1.0.6/logback-classic-1.0.6.jar!/org/slf4j/impl/
> > StaticLoggerBinder.class]
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> > SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> > environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52
> GMT
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:
> host.name=
> > 192.168.128.10
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.version=
> > 1.6.0_65
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.vendor=
> > Apple Inc.
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.home=/
> > System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.class.path
> >
> =/workspace/tools/stormprj/kafka-producers/target/test-classes:/workspace/tools
> >
> /stormprj/kafka-producers/target/classes:/Users/sali/.m2/repository/javax/
> >
> servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/sali/.m2/repository/org/
> >
> twitter4j/twitter4j-stream/3.0.5/twitter4j-stream-3.0.5.jar:/Users/sali/.m2/
> >
> repository/org/twitter4j/twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/
> >
> sali/.m2/repository/net/sf/opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/
> >
> repository/org/json/json/20140107/json-20140107.jar:/Users/sali/.m2/repository/
> >
> org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/
> >
> org/slf4j/slf4j-api/1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/
> > log4j/1.2.17/log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/
> >
> scala-library/2.9.2/scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/
> >
> apache/zookeeper/zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository
> > /org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/
> >
> repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit
> >
> /junit/4.10/junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/
> >
> 3.2.2.Final/netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/
> >
> kafka_2.9.2/0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/
> >
> jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/
> > scala-lang/scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/
> >
> repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/
> > org/xerial/snappy/snappy-java/
> 1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/
> >
> repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/
> > sali/.m2/repository/com/yammer/metrics/metrics-annotation/2.2.0/
> > metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> >
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> > repository/org/apache/curator/curator-framework/2.4.0/
> >
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> >
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> >
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> > testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> > hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> > beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> > jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> > snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> > mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> >
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> >
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> >
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> > /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> > repository/storm/storm/
> 0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> > storm/storm-console-logging/
> 0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> > sali/.m2/repository/storm/storm-core/
> 0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> >
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> > repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> >
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> >
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> >
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> >
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> > Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> >
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> > 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> >
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> > clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> >
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> >
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> >
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> >
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> >
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> > repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> >
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> >
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> > sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> >
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> >
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> >
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> > repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> >
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> > sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> >
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> >
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> >
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> >
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> >
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> >
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> > /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> >
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> >
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> >
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> >
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> >
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> >
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> > 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> > tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> >
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> > 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> >
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> > qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> >
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> > sali/.m2/repository/storm/storm-netty/
> 0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> >
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> > sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> > commons-collections-3.2.1.jar
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> >
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> > Extensions:/usr/lib/java
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.io.tmpdir=
> > /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.compiler=
> > <NA>
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac
> OS
> > X
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:os.arch=x86_64
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:os.version=
> > 10.9.4
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:
> user.name=sali
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:user.home=/
> > Users/sali
> > [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:user.dir=/
> > workspace/tools/stormprj/kafka-producers
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52
> GMT
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:host.name=192.168.128.10
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:java.version=1.6.0_65
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:java.vendor=Apple Inc.
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/
> > Contents/Home
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> >
> environment:java.class.path=/workspace/tools/stormprj/kafka-producers/target/
> >
> test-classes:/workspace/tools/stormprj/kafka-producers/target/classes:/Users/
> >
> sali/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/
> > sali/.m2/repository/org/twitter4j/twitter4j-stream/3.0.5/
> > twitter4j-stream-3.0.5.jar:/Users/sali/.m2/repository/org/twitter4j/
> >
> twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/sali/.m2/repository/net/sf
> >
> /opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/repository/org/json/json/
> >
> 20140107/json-20140107.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/
> >
> 1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-api/
> > 1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/log4j/1.2.17/
> >
> log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/scala-library/2.9.2/
> > scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/apache/zookeeper/
> > zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository/org/slf4j/
> >
> slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/repository/jline/
> >
> jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit/junit/4.10/
> >
> junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/3.2.2.Final/
> >
> netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/kafka_2.9.2/
> >
> 0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/jopt-simple/
> >
> jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/scala-lang/
> >
> scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/repository/com/
> >
> 101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/org/xerial/
> > snappy/snappy-java/
> 1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/repository/
> >
> com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/sali/.m2/
> > repository/com/yammer/metrics/metrics-annotation/2.2.0/
> > metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> >
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> > repository/org/apache/curator/curator-framework/2.4.0/
> >
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> >
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> >
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> > testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> > hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> > beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> > jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> > snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> > mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> >
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> >
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> >
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> > /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> > repository/storm/storm/
> 0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> > storm/storm-console-logging/
> 0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> > sali/.m2/repository/storm/storm-core/
> 0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> >
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> > repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> >
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> >
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> >
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> >
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> > Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> >
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> > 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> >
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> > clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> >
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> >
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> >
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> >
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> >
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> > repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> >
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> >
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> > sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> >
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> >
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> >
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> > repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> >
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> > sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> >
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> >
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> >
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> >
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> >
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> >
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> > /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> >
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> >
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> >
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> >
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> >
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> >
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> > 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> > tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> >
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> > 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> >
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> > qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> >
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> > sali/.m2/repository/storm/storm-netty/
> 0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> >
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> > sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> > commons-collections-3.2.1.jar
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> >
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> > Extensions:/usr/lib/java
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> >
> environment:java.io.tmpdir=/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:java.compiler=<NA>
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:os.name=Mac OS X
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:os.arch=x86_64
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:os.version=10.9.4
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:user.name=sali
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:user.home=/Users/sali
> > [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> > environment:user.dir=/workspace/tools/stormprj/kafka-producers
> > Exception in thread "main" java.lang.ExceptionInInitializerError
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:249)
> > at clojure.lang.RT.loadClassForName(RT.java:2056)
> > at clojure.lang.RT.load(RT.java:419)
> > at clojure.lang.RT.load(RT.java:400)
> > at clojure.core$load$fn__4890.invoke(core.clj:5415)
> > at clojure.core$load.doInvoke(core.clj:5414)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at clojure.core$load_one.invoke(core.clj:5227)
> > at clojure.core$load_lib.doInvoke(core.clj:5264)
> > at clojure.lang.RestFn.applyTo(RestFn.java:142)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$load_libs.doInvoke(core.clj:5302)
> > at clojure.lang.RestFn.applyTo(RestFn.java:137)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$require.doInvoke(core.clj:5381)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at backtype.storm.cluster$loading__4784__auto__.invoke(cluster.clj:1)
> > at backtype.storm.cluster__init.load(Unknown Source)
> > at backtype.storm.cluster__init.<clinit>(Unknown Source)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:249)
> > at clojure.lang.RT.loadClassForName(RT.java:2056)
> > at clojure.lang.RT.load(RT.java:419)
> > at clojure.lang.RT.load(RT.java:400)
> > at clojure.core$load$fn__4890.invoke(core.clj:5415)
> > at clojure.core$load.doInvoke(core.clj:5414)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at clojure.core$load_one.invoke(core.clj:5227)
> > at clojure.core$load_lib.doInvoke(core.clj:5264)
> > at clojure.lang.RestFn.applyTo(RestFn.java:142)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$load_libs.doInvoke(core.clj:5302)
> > at clojure.lang.RestFn.applyTo(RestFn.java:137)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$require.doInvoke(core.clj:5381)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at backtype.storm.daemon.nimbus__init.load(Unknown Source)
> > at backtype.storm.daemon.nimbus__init.<clinit>(Unknown Source)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:249)
> > at clojure.lang.RT.loadClassForName(RT.java:2056)
> > at clojure.lang.RT.load(RT.java:419)
> > at clojure.lang.RT.load(RT.java:400)
> > at clojure.core$load$fn__4890.invoke(core.clj:5415)
> > at clojure.core$load.doInvoke(core.clj:5414)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at clojure.core$load_one.invoke(core.clj:5227)
> > at clojure.core$load_lib.doInvoke(core.clj:5264)
> > at clojure.lang.RestFn.applyTo(RestFn.java:142)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$load_libs.doInvoke(core.clj:5302)
> > at clojure.lang.RestFn.applyTo(RestFn.java:137)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$require.doInvoke(core.clj:5381)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at backtype.storm.testing$loading__4784__auto__.invoke(testing.clj:1)
> > at backtype.storm.testing__init.load(Unknown Source)
> > at backtype.storm.testing__init.<clinit>(Unknown Source)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:249)
> > at clojure.lang.RT.loadClassForName(RT.java:2056)
> > at clojure.lang.RT.load(RT.java:419)
> > at clojure.lang.RT.load(RT.java:400)
> > at clojure.core$load$fn__4890.invoke(core.clj:5415)
> > at clojure.core$load.doInvoke(core.clj:5414)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at clojure.core$load_one.invoke(core.clj:5227)
> > at clojure.core$load_lib.doInvoke(core.clj:5264)
> > at clojure.lang.RestFn.applyTo(RestFn.java:142)
> > at clojure.core$apply.invoke(core.clj:603)
> > at clojure.core$load_libs.doInvoke(core.clj:5302)
> > at clojure.lang.RestFn.applyTo(RestFn.java:137)
> > at clojure.core$apply.invoke(core.clj:605)
> > at clojure.core$use.doInvoke(core.clj:5392)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at
> backtype.storm.LocalCluster$loading__4784__auto__.invoke(LocalCluster.clj:1)
> > at backtype.storm.LocalCluster__init.load(Unknown Source)
> > at backtype.storm.LocalCluster__init.<clinit>(Unknown Source)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:249)
> > at clojure.lang.RT.loadClassForName(RT.java:2056)
> > at clojure.lang.RT.load(RT.java:419)
> > at clojure.lang.RT.load(RT.java:400)
> > at clojure.core$load$fn__4890.invoke(core.clj:5415)
> > at clojure.core$load.doInvoke(core.clj:5414)
> > at clojure.lang.RestFn.invoke(RestFn.java:408)
> > at clojure.lang.Var.invoke(Var.java:415)
> > at backtype.storm.LocalCluster.<clinit>(Unknown Source)
> > at storm.ingestTopology.main(ingestTopology.java:126)
> > Caused by: java.lang.ClassNotFoundException:
> > org.apache.zookeeper.server.NIOServerCnxn$Factory
> > at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> > at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> > at java.lang.Class.forName0(Native Method)
> > at java.lang.Class.forName(Class.java:171)
> > at backtype.storm.zookeeper$loading__4784__auto__.invoke(zookeeper.clj:1)
> > at backtype.storm.zookeeper__init.load(Unknown Source)
> > at backtype.storm.zookeeper__init.<clinit>(Unknown Source)
> > ... 90 more
> >
> > I poke around this error java.lang.ClassNotFoundException:
> > org.apache.zookeeper.server.NIOServerCnxn$Factory, making changes on
> pom, seems
> > there is no solid solution, someone suggested to downgrade the
> zookeeper, but I
> > kinda do not want to do that, is there any other solution?
> >
> >
> > thanks
> >
> >
> > Alec
> >
> >
> > On Jul 7, 2014, at 5:04 PM, Sa Li <sa...@gmail.com> wrote:
> >
> >
> >     great! Robert, will start from there.
> >
> >
> >     On Jul 7, 2014, at 11:14 AM, Robert Lee <le...@gmail.com>
> wrote:
> >
> >
> >         Alec,
> >
> >         Check out the very nicely compiled storm-kafka module within
> storm that
> >         has been developed by wurstmeister (https://github.com/apache/
> >         incubator-storm/tree/master/external/storm-kafka). For a quick
> start
> >         add the following to your pom file:
> >         <dependency>
> >                 <artifactId>storm-kafka</artifactId>
> >                 <groupId>org.apache.storm</groupId>
> >                <version>0.9.2-incubating</version>
> >         </dependency>
> >
> >         And within your main code set up a kafka spout with the
> following code:
> >
> >
> >         TridentTopology topology = new TridentTopology();
> >         BrokerHosts zk = new ZkHosts("localhost");
> >         TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "ingest_test");
> >         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> >         OpaqueTridentKafkaSpout spout = new
> OpaqueTridentKafkaSpout(spoutConf);
> >
> >
> >
> >         topology.newStream("kafka", spout).shuffle()
> >
> >         .each(new Fields("str"), new
> FunctionThatWorksOnKafkaOutputMessage(), new Fields("yourField"))
> >
> >         ......
> >
> >
> >         Your first function on the topology after creating the spout
> stream
> >         will take the message and create whatever data field you want
> such that
> >         you can operate on the postgresql data.
> >
> >         Hope that gives you a quick start,
> >         Robert
> >
> >
> >         On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <sa...@gmail.com>
> wrote:
> >
> >             Hello, Robert
> >
> >             As you mentioned in last thread, I download your kafka
> stuff, that
> >             was very useful, I have already implemented a kafka producer
> to get
> >             data from postgresql and sending data to brokers. By checking
> >
> >             bin/kafka-console-consumer.sh --zookeeper localhost:2181
> --topic
> >             ingest_test --from-beginning
> >
> >
> >             I know consumer can receive the data. Now I like to
> integrate the
> >             kafka producer into storm. I try to understand the logic, so
> the
> >             storm spout suppose to be functional as consumer and
> _collector
> >             will get the data sent by kafka producer, is that right?
> >
> >             Hope there are some sample codes available to use.
> >
> >             Thanks
> >
> >             Alec
> >
> >
> >             On Jun 27, 2014, at 11:58 AM, Robert Lee <
> lee.robert138@gmail.com>
> >             wrote:,
> >
> >
> >                 I always like to simplify things. If I were you, I would
> use
> >                 the well known and used spout of kafka to ingest data
> into your
> >                 storm cluster. Simply write a Kafka Producer that
> utilizes the
> >                 postgre java driver to pull out your required data and
> send it
> >                 as a message. You'll find it is pretty easy to write
> kafka
> >                 producers. Check out my project of creating some simple
> >                 producers and just mirror that to produce your postgre
> >                 producer:
> >
> >                 https://github.com/leerobert/kafka-producers
> >
> >
> >                 On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <
> sa.in.vanc@gmail.com>
> >                 wrote:
> >
> >                     Thanks a lot, John. The entire project is getting
> data from
> >                     postgresql and finally emit and update cassandra
> tables.
> >                     With the help of Robert in this group, think I have
> some
> >                     resource of storm-cassandra integration. However,
>  really
> >                     not much tutorials regarding postgres with storm, '
> >                     storm-rdbms ‘ is the only examples I can find about
> db->
> >                     storm. That would be great if someone can contribute
> more
> >                     example code about posture-storm. Sorry for the
> shameless
> >                     requirement from a new storm user.
> >
> >
> >                     thanka
> >
> >                     Alec
> >                     On Jun 27, 2014, at 5:53 AM, John Welcher <
> >                     jpwelcher@gmail.com> wrote:
> >
> >
> >                         Hi
> >
> >                         We use Postgres notifications. The spout open
> method
> >                         registers for database notifications (add,
> update,
> >                         delete). Each time the spout next method is
> called we
> >                         check for pending notifications and process
> >                         accordingly.
> >
> >                         Good Luck
> >
> >                         John
> >
> >
> >                         On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <
> >                         sa.in.vanc@gmail.com> wrote:
> >
> >                             Dear all
> >
> >                             I am doing an implementation of spout, the
> stream
> >                             of is coming from a postgresql ingress API
> >                             (in-house project).  All I know for now is
> to get
> >                             spout connected to postgresl, and retrieve
> the data
> >                             periodically and store the data to a queue
> and then
> >                             emits to the topology.  Anyone has ever done
> the
> >                             similar job, hope to get some instructions
> and
> >                             details from you.
> >
> >
> >                             thanks
> >
> >                             Alec
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>

Re: postgresql -> spout

Posted by Marc Vaillant <va...@animetrics.com>.
I know that this is very late to jump in but if you are integrating into
a system that is already using postgres as a message broker/queue, why
not poll your postgres queue directly from nexttuple instead of pushing
everything onto another queue?  Just use jdbc to connect to postgres
from nexttuple. 

Best,
Marc

On Thu, Jul 17, 2014 at 02:18:33PM -0700, Sa Li wrote:
> Hello, Robert
> This is the followup message from last thread, just back this topic from other
> work, as you suggested, I have written the topology to run, basically I will
> get the json object from kafka producer, the object is like this:
> 
> "{"messagetype":"PageView","time":1402437708,"totaltime":9,"pagename
>  :"user.aspx","profileid":69781139,"userid :76888177}"
> 
> And I want to be able to retrieve data fields of time and userid for future
> use, here the code I wrote
> 
> public class ingestTopology {
> 
> 
> protected ingestTopology() {
>         throw new UnsupportedOperationException();
>     }
> 
>     public static class JsonObjectParse extends BaseFunction {
>         @Override
>         public final void execute(
>             final TridentTuple tuple,
>             final TridentCollector collector
>         ) {
>             byte[] bytes = tuple.getBinary(0);
>             try {
>                 String decoded = new String(bytes);
>                 JSONObject json = new JSONObject(decoded);
>                 collector.emit(new Values(
>                       json.getString("time")
>                     , json.getString("userid")
>                 ));
>             } catch (JSONException e) {
>                 System.err.println("Caught JSONException: " + e.getMessage());
>             }
>         }
>     }
> 
>     
> 
>     public static StormTopology buildTopology() {
>         try {
>         TridentTopology topology = new TridentTopology();
>     BrokerHosts zk = new ZkHosts("localhost");
>     TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
>     spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>     OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
> 
>                   
> 
>             topology.newStream("spout1", spout)
>                 .each(new Fields("str"), new JsonObjectParse(),
>                     new Fields(
>                         "time",
>                         "userid"
>                 ));
> 
>             return topology.build();
>         } catch (IllegalArgumentException e) {
>             System.err.println("Caught IOException: " + e.getMessage());
>         }
>         return null;
>     }
> 
> public static void main(String[] args) throws Exception {
> 
>  
> 
>     Config conf = new Config();
>     conf.setDebug(true);
> 
>         
> 
>         if (args != null && args.length > 0) {
>             conf.setNumWorkers(3);
> 
>             StormSubmitter.submitTopology(args[0], conf, buildTopology());
>           }
>           else {
>           conf.setMaxSpoutPending(1);
>              conf.setMaxTaskParallelism(3);
>           LocalCluster cluster = new LocalCluster();
>             cluster.submitTopology("kafka", conf, buildTopology());    
> 
>          
> 
>             Thread.sleep(100);
>               cluster.shutdown();
>           }
> 
>    
> 
> }
> }
> 
> However I got tons of error, 
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/
> slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar!/org/slf4j/impl/
> StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=
> 192.168.128.10
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=
> 1.6.0_65
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=
> Apple Inc.
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/
> System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path
> =/workspace/tools/stormprj/kafka-producers/target/test-classes:/workspace/tools
> /stormprj/kafka-producers/target/classes:/Users/sali/.m2/repository/javax/
> servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/sali/.m2/repository/org/
> twitter4j/twitter4j-stream/3.0.5/twitter4j-stream-3.0.5.jar:/Users/sali/.m2/
> repository/org/twitter4j/twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/
> sali/.m2/repository/net/sf/opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/
> repository/org/json/json/20140107/json-20140107.jar:/Users/sali/.m2/repository/
> org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/
> org/slf4j/slf4j-api/1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/
> log4j/1.2.17/log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/
> scala-library/2.9.2/scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/
> apache/zookeeper/zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository
> /org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/
> repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit
> /junit/4.10/junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/
> 3.2.2.Final/netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/
> kafka_2.9.2/0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/
> jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/
> scala-lang/scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/
> repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/
> org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/
> repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/
> sali/.m2/repository/com/yammer/metrics/metrics-annotation/2.2.0/
> metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> repository/org/apache/curator/curator-framework/2.4.0/
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> commons-collections-3.2.1.jar
> [main] INFO org.apache.zookeeper.ZooKeeper - Client
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> Extensions:/usr/lib/java
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=
> /var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=
> <NA>
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS
> X
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=
> 10.9.4
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=sali
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/
> Users/sali
> [main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/
> workspace/tools/stormprj/kafka-producers
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:host.name=192.168.128.10
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.version=1.6.0_65
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.vendor=Apple Inc.
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/
> Contents/Home
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.class.path=/workspace/tools/stormprj/kafka-producers/target/
> test-classes:/workspace/tools/stormprj/kafka-producers/target/classes:/Users/
> sali/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/
> sali/.m2/repository/org/twitter4j/twitter4j-stream/3.0.5/
> twitter4j-stream-3.0.5.jar:/Users/sali/.m2/repository/org/twitter4j/
> twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/sali/.m2/repository/net/sf
> /opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/repository/org/json/json/
> 20140107/json-20140107.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/
> 1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-api/
> 1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/log4j/1.2.17/
> log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/scala-library/2.9.2/
> scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/apache/zookeeper/
> zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository/org/slf4j/
> slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/repository/jline/
> jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit/junit/4.10/
> junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/3.2.2.Final/
> netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/kafka_2.9.2/
> 0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/jopt-simple/
> jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/scala-lang/
> scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/repository/com/
> 101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/org/xerial/
> snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/repository/
> com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/sali/.m2/
> repository/com/yammer/metrics/metrics-annotation/2.2.0/
> metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/
> storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/
> repository/org/apache/curator/curator-framework/2.4.0/
> curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/
> curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/
> google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/
> testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/
> hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/
> beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/
> jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/
> snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/
> mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/
> easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/
> repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/
> repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org
> /hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/
> repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/
> storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/
> sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali
> /.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/
> repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/
> repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali
> /.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2
> /repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/
> repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/
> Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/
> httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/
> 1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/
> commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/
> clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/
> joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/
> curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/
> com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali
> /.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/
> com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/
> repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/
> repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/
> sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/
> sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/
> repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/
> commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users
> /sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/
> repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/
> repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/
> sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/
> ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/
> 0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/
> jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/
> jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/
> repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali
> /.m2/repository/org/clojure/math.numeric-tower/0.0.1/
> math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/
> carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/
> 2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/
> reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/
> asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/
> minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/
> 1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/
> tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/
> disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/
> 0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/
> logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/
> qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/
> repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/
> sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/
> sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/
> sali/.m2/repository/commons-collections/commons-collections/3.2.1/
> commons-collections-3.2.1.jar
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/
> Extensions:/usr/lib/java
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.io.tmpdir=/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:java.compiler=<NA>
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.name=Mac OS X
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.arch=x86_64
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:os.version=10.9.4
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.name=sali
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.home=/Users/sali
> [main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server
> environment:user.dir=/workspace/tools/stormprj/kafka-producers
> Exception in thread "main" java.lang.ExceptionInInitializerError
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.cluster$loading__4784__auto__.invoke(cluster.clj:1)
> at backtype.storm.cluster__init.load(Unknown Source)
> at backtype.storm.cluster__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.daemon.nimbus__init.load(Unknown Source)
> at backtype.storm.daemon.nimbus__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$require.doInvoke(core.clj:5381)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.testing$loading__4784__auto__.invoke(testing.clj:1)
> at backtype.storm.testing__init.load(Unknown Source)
> at backtype.storm.testing__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.core$load_one.invoke(core.clj:5227)
> at clojure.core$load_lib.doInvoke(core.clj:5264)
> at clojure.lang.RestFn.applyTo(RestFn.java:142)
> at clojure.core$apply.invoke(core.clj:603)
> at clojure.core$load_libs.doInvoke(core.clj:5302)
> at clojure.lang.RestFn.applyTo(RestFn.java:137)
> at clojure.core$apply.invoke(core.clj:605)
> at clojure.core$use.doInvoke(core.clj:5392)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at backtype.storm.LocalCluster$loading__4784__auto__.invoke(LocalCluster.clj:1)
> at backtype.storm.LocalCluster__init.load(Unknown Source)
> at backtype.storm.LocalCluster__init.<clinit>(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:249)
> at clojure.lang.RT.loadClassForName(RT.java:2056)
> at clojure.lang.RT.load(RT.java:419)
> at clojure.lang.RT.load(RT.java:400)
> at clojure.core$load$fn__4890.invoke(core.clj:5415)
> at clojure.core$load.doInvoke(core.clj:5414)
> at clojure.lang.RestFn.invoke(RestFn.java:408)
> at clojure.lang.Var.invoke(Var.java:415)
> at backtype.storm.LocalCluster.<clinit>(Unknown Source)
> at storm.ingestTopology.main(ingestTopology.java:126)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.zookeeper.server.NIOServerCnxn$Factory
> at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:171)
> at backtype.storm.zookeeper$loading__4784__auto__.invoke(zookeeper.clj:1)
> at backtype.storm.zookeeper__init.load(Unknown Source)
> at backtype.storm.zookeeper__init.<clinit>(Unknown Source)
> ... 90 more
> 
> I poke around this error java.lang.ClassNotFoundException:
> org.apache.zookeeper.server.NIOServerCnxn$Factory, making changes on pom, seems
> there is no solid solution, someone suggested to downgrade the zookeeper, but I
> kinda do not want to do that, is there any other solution?
> 
> 
> thanks
> 
> 
> Alec
> 
> 
> On Jul 7, 2014, at 5:04 PM, Sa Li <sa...@gmail.com> wrote:
> 
> 
>     great! Robert, will start from there.
> 
> 
>     On Jul 7, 2014, at 11:14 AM, Robert Lee <le...@gmail.com> wrote:
> 
> 
>         Alec,
> 
>         Check out the very nicely compiled storm-kafka module within storm that
>         has been developed by wurstmeister (https://github.com/apache/
>         incubator-storm/tree/master/external/storm-kafka). For a quick start
>         add the following to your pom file:
>         <dependency>
>                 <artifactId>storm-kafka</artifactId>
>                 <groupId>org.apache.storm</groupId>
>                <version>0.9.2-incubating</version>
>         </dependency>
> 
>         And within your main code set up a kafka spout with the following code:
> 
> 
>         TridentTopology topology = new TridentTopology();
>         BrokerHosts zk = new ZkHosts("localhost");
>         TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
>         spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>         OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
> 
> 
>         
>         topology.newStream("kafka", spout).shuffle()
>         
>         .each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(), new Fields("yourField"))
> 
>         ......
>         
> 
>         Your first function on the topology after creating the spout stream
>         will take the message and create whatever data field you want such that
>         you can operate on the postgresql data.
> 
>         Hope that gives you a quick start,
>         Robert
> 
> 
>         On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <sa...@gmail.com> wrote:
> 
>             Hello, Robert
> 
>             As you mentioned in last thread, I download your kafka stuff, that
>             was very useful, I have already implemented a kafka producer to get
>             data from postgresql and sending data to brokers. By checking
> 
>             bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
>             ingest_test --from-beginning
> 
> 
>             I know consumer can receive the data. Now I like to integrate the
>             kafka producer into storm. I try to understand the logic, so the
>             storm spout suppose to be functional as consumer and _collector
>             will get the data sent by kafka producer, is that right?
> 
>             Hope there are some sample codes available to use. 
> 
>             Thanks
> 
>             Alec 
> 
> 
>             On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com>
>             wrote:, 
> 
> 
>                 I always like to simplify things. If I were you, I would use
>                 the well known and used spout of kafka to ingest data into your
>                 storm cluster. Simply write a Kafka Producer that utilizes the
>                 postgre java driver to pull out your required data and send it
>                 as a message. You'll find it is pretty easy to write kafka
>                 producers. Check out my project of creating some simple
>                 producers and just mirror that to produce your postgre
>                 producer:
> 
>                 https://github.com/leerobert/kafka-producers
> 
> 
>                 On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com>
>                 wrote:
> 
>                     Thanks a lot, John. The entire project is getting data from
>                     postgresql and finally emit and update cassandra tables.
>                     With the help of Robert in this group, think I have some
>                     resource of storm-cassandra integration. However,  really
>                     not much tutorials regarding postgres with storm, '
>                     storm-rdbms ‘ is the only examples I can find about db->
>                     storm. That would be great if someone can contribute more
>                     example code about posture-storm. Sorry for the shameless
>                     requirement from a new storm user.
> 
> 
>                     thanka
> 
>                     Alec
>                     On Jun 27, 2014, at 5:53 AM, John Welcher <
>                     jpwelcher@gmail.com> wrote:
> 
> 
>                         Hi
> 
>                         We use Postgres notifications. The spout open method 
>                         registers for database notifications (add, update,
>                         delete). Each time the spout next method is called we
>                         check for pending notifications and process
>                         accordingly.
> 
>                         Good Luck
> 
>                         John
> 
> 
>                         On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <
>                         sa.in.vanc@gmail.com> wrote:
> 
>                             Dear all
> 
>                             I am doing an implementation of spout, the stream
>                             of is coming from a postgresql ingress API
>                             (in-house project).  All I know for now is to get
>                             spout connected to postgresl, and retrieve the data
>                             periodically and store the data to a queue and then
>                             emits to the topology.  Anyone has ever done the
>                             similar job, hope to get some instructions and
>                             details from you.
> 
> 
>                             thanks
> 
>                             Alec
>                            
>                            
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 

Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
Hello, Robert
This is the followup message from last thread, just back this topic from other work, as you suggested, I have written the topology to run, basically I will get the json object from kafka producer, the object is like this:

"{"messagetype":"PageView","time":1402437708,"totaltime":9,"pagename”:"user.aspx","profileid":69781139,"userid”:76888177}"

And I want to be able to retrieve data fields of time and userid for future use, here the code I wrote

public class ingestTopology {
	
	protected ingestTopology() {
        throw new UnsupportedOperationException();
    }
	
    public static class JsonObjectParse extends BaseFunction {
        @Override
        public final void execute(
            final TridentTuple tuple,
            final TridentCollector collector
        ) {
            byte[] bytes = tuple.getBinary(0);
            try {
                String decoded = new String(bytes);
                JSONObject json = new JSONObject(decoded);
                collector.emit(new Values(
                      json.getString("time")
                    , json.getString("userid")
                ));
            } catch (JSONException e) {
                System.err.println("Caught JSONException: " + e.getMessage());
            }
        }
    }
    
    public static StormTopology buildTopology() {
        try {
        	TridentTopology topology = new TridentTopology();
    		BrokerHosts zk = new ZkHosts("localhost");
    		TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
    		spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
    		OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
        	          
            topology.newStream("spout1", spout)
                .each(new Fields("str"), new JsonObjectParse(),
                    new Fields(
                        "time",
                        "userid"
                ));

            return topology.build();
        } catch (IllegalArgumentException e) {
            System.err.println("Caught IOException: " + e.getMessage());
        }
        return null;
    }

	public static void main(String[] args) throws Exception {
	 
	    Config conf = new Config();
	    conf.setDebug(true);
        
        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, buildTopology());
          }
          else {
        	  conf.setMaxSpoutPending(1);
       	      conf.setMaxTaskParallelism(3);
        	  LocalCluster cluster = new LocalCluster();
      	      cluster.submitTopology("kafka", conf, buildTopology());	   
      	   
      	      Thread.sleep(100);
              cluster.shutdown();
          }
	    
	}
}

However I got tons of error, 
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/Users/sali/.m2/repository/ch/qos/logback/logback-classic/1.0.6/logback-classic-1.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.SimpleLoggerFactory]
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:host.name=192.168.128.10
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.version=1.6.0_65
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.vendor=Apple Inc.
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.class.path=/workspace/tools/stormprj/kafka-producers/target/test-classes:/workspace/tools/stormprj/kafka-producers/target/classes:/Users/sali/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/sali/.m2/repository/org/twitter4j/twitter4j-stream/3.0.5/twitter4j-stream-3.0.5.jar:/Users/sali/.m2/repository/org/twitter4j/twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/sali/.m2/repository/net/sf/opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/repository/org/json/json/20140107/json-20140107.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-api/1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/scala-library/2.9.2/scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/apache/zookeeper/zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit/junit/4.10/junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/kafka_2.9.2/0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/scala-lang/scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/sali/.m2/repository/com/yammer/metrics/metrics-annotation/2.2.0/metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/repository/org/apache/curator/curator-framework/2.4.0/curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org/hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali/.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali/.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2/repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali/.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users/sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali/.m2/repository/org/clojure/math.numeric-tower/0.0.1/math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/sali/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.io.tmpdir=/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:java.compiler=<NA>
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.name=Mac OS X
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.arch=x86_64
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:os.version=10.9.4
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.name=sali
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.home=/Users/sali
[main] INFO org.apache.zookeeper.ZooKeeper - Client environment:user.dir=/workspace/tools/stormprj/kafka-producers
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:host.name=192.168.128.10
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.version=1.6.0_65
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.vendor=Apple Inc.
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.home=/System/Library/Java/JavaVirtualMachines/1.6.0.jdk/Contents/Home
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.class.path=/workspace/tools/stormprj/kafka-producers/target/test-classes:/workspace/tools/stormprj/kafka-producers/target/classes:/Users/sali/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/sali/.m2/repository/org/twitter4j/twitter4j-stream/3.0.5/twitter4j-stream-3.0.5.jar:/Users/sali/.m2/repository/org/twitter4j/twitter4j-core/3.0.5/twitter4j-core-3.0.5.jar:/Users/sali/.m2/repository/net/sf/opencsv/opencsv/2.1/opencsv-2.1.jar:/Users/sali/.m2/repository/org/json/json/20140107/json-20140107.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-simple/1.7.2/slf4j-simple-1.7.2.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-api/1.7.2/slf4j-api-1.7.2.jar:/Users/sali/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/sali/.m2/repository/org/scala-lang/scala-library/2.9.2/scala-library-2.9.2.jar:/Users/sali/.m2/repository/org/apache/zookeeper/zookeeper/3.4.5/zookeeper-3.4.5.jar:/Users/sali/.m2/repository/org/slf4j/slf4j-log4j12/1.6.1/slf4j-log4j12-1.6.1.jar:/Users/sali/.m2/repository/jline/jline/0.9.94/jline-0.9.94.jar:/Users/sali/.m2/repository/junit/junit/4.10/junit-4.10.jar:/Users/sali/.m2/repository/org/jboss/netty/netty/3.2.2.Final/netty-3.2.2.Final.jar:/Users/sali/.m2/repository/org/apache/kafka/kafka_2.9.2/0.8.0/kafka_2.9.2-0.8.0.jar:/Users/sali/.m2/repository/net/sf/jopt-simple/jopt-simple/3.2/jopt-simple-3.2.jar:/Users/sali/.m2/repository/org/scala-lang/scala-compiler/2.9.2/scala-compiler-2.9.2.jar:/Users/sali/.m2/repository/com/101tec/zkclient/0.3/zkclient-0.3.jar:/Users/sali/.m2/repository/org/xerial/snappy/snappy-java/1.0.4.1/snappy-java-1.0.4.1.jar:/Users/sali/.m2/repository/com/yammer/metrics/metrics-core/2.2.0/metrics-core-2.2.0.jar:/Users/sali/.m2/repository/com/yammer/metrics/metrics-annotation/2.2.0/metrics-annotation-2.2.0.jar:/Users/sali/.m2/repository/org/apache/storm/storm-kafka/0.9.2-incubating/storm-kafka-0.9.2-incubating.jar:/Users/sali/.m2/repository/org/apache/curator/curator-framework/2.4.0/curator-framework-2.4.0.jar:/Users/sali/.m2/repository/org/apache/curator/curator-client/2.4.0/curator-client-2.4.0.jar:/Users/sali/.m2/repository/com/google/guava/guava/15.0/guava-15.0.jar:/Users/sali/.m2/repository/org/testng/testng/6.8.5/testng-6.8.5.jar:/Users/sali/.m2/repository/org/hamcrest/hamcrest-core/1.1/hamcrest-core-1.1.jar:/Users/sali/.m2/repository/org/beanshell/bsh/2.0b4/bsh-2.0b4.jar:/Users/sali/.m2/repository/com/beust/jcommander/1.27/jcommander-1.27.jar:/Users/sali/.m2/repository/org/yaml/snakeyaml/1.6/snakeyaml-1.6.jar:/Users/sali/.m2/repository/org/mockito/mockito-all/1.9.0/mockito-all-1.9.0.jar:/Users/sali/.m2/repository/org/easytesting/fest-assert-core/2.0M8/fest-assert-core-2.0M8.jar:/Users/sali/.m2/repository/org/easytesting/fest-util/1.2.3/fest-util-1.2.3.jar:/Users/sali/.m2/repository/org/jmock/jmock/2.6.0/jmock-2.6.0.jar:/Users/sali/.m2/repository/org/hamcrest/hamcrest-library/1.1/hamcrest-library-1.1.jar:/Users/sali/.m2/repository/storm/storm/0.9.0.1/storm-0.9.0.1.jar:/Users/sali/.m2/repository/storm/storm-console-logging/0.9.0.1/storm-console-logging-0.9.0.1.jar:/Users/sali/.m2/repository/storm/storm-core/0.9.0.1/storm-core-0.9.0.1.jar:/Users/sali/.m2/repository/org/clojure/clojure/1.4.0/clojure-1.4.0.jar:/Users/sali/.m2/repository/commons-io/commons-io/1.4/commons-io-1.4.jar:/Users/sali/.m2/repository/org/apache/commons/commons-exec/1.1/commons-exec-1.1.jar:/Users/sali/.m2/repository/storm/libthrift7/0.7.0-2/libthrift7-0.7.0-2.jar:/Users/sali/.m2/repository/commons-lang/commons-lang/2.5/commons-lang-2.5.jar:/Users/sali/.m2/repository/org/apache/httpcomponents/httpclient/4.1.1/httpclient-4.1.1.jar:/Users/sali/.m2/repository/org/apache/httpcomponents/httpcore/4.1/httpcore-4.1.jar:/Users/sali/.m2/repository/commons-logging/commons-logging/1.1.1/commons-logging-1.1.1.jar:/Users/sali/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/sali/.m2/repository/clj-time/clj-time/0.4.1/clj-time-0.4.1.jar:/Users/sali/.m2/repository/joda-time/joda-time/2.0/joda-time-2.0.jar:/Users/sali/.m2/repository/com/netflix/curator/curator-framework/1.0.1/curator-framework-1.0.1.jar:/Users/sali/.m2/repository/com/netflix/curator/curator-client/1.0.1/curator-client-1.0.1.jar:/Users/sali/.m2/repository/backtype/jzmq/2.1.0/jzmq-2.1.0.jar:/Users/sali/.m2/repository/com/googlecode/json-simple/json-simple/1.1/json-simple-1.1.jar:/Users/sali/.m2/repository/compojure/compojure/1.1.3/compojure-1.1.3.jar:/Users/sali/.m2/repository/org/clojure/core.incubator/0.1.0/core.incubator-0.1.0.jar:/Users/sali/.m2/repository/org/clojure/tools.macro/0.1.0/tools.macro-0.1.0.jar:/Users/sali/.m2/repository/clout/clout/1.0.1/clout-1.0.1.jar:/Users/sali/.m2/repository/ring/ring-core/1.1.5/ring-core-1.1.5.jar:/Users/sali/.m2/repository/commons-fileupload/commons-fileupload/1.2.1/commons-fileupload-1.2.1.jar:/Users/sali/.m2/repository/hiccup/hiccup/0.3.6/hiccup-0.3.6.jar:/Users/sali/.m2/repository/ring/ring-devel/0.3.11/ring-devel-0.3.11.jar:/Users/sali/.m2/repository/clj-stacktrace/clj-stacktrace/0.2.2/clj-stacktrace-0.2.2.jar:/Users/sali/.m2/repository/ring/ring-jetty-adapter/0.3.11/ring-jetty-adapter-0.3.11.jar:/Users/sali/.m2/repository/ring/ring-servlet/0.3.11/ring-servlet-0.3.11.jar:/Users/sali/.m2/repository/org/mortbay/jetty/jetty/6.1.26/jetty-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/sali/.m2/repository/org/mortbay/jetty/servlet-api/2.5-20081211/servlet-api-2.5-20081211.jar:/Users/sali/.m2/repository/org/clojure/tools.logging/0.2.3/tools.logging-0.2.3.jar:/Users/sali/.m2/repository/org/clojure/math.numeric-tower/0.0.1/math.numeric-tower-0.0.1.jar:/Users/sali/.m2/repository/storm/carbonite/1.5.0/carbonite-1.5.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/kryo/kryo/2.17/kryo-2.17.jar:/Users/sali/.m2/repository/com/esotericsoftware/reflectasm/reflectasm/1.07/reflectasm-1.07-shaded.jar:/Users/sali/.m2/repository/org/ow2/asm/asm/4.0/asm-4.0.jar:/Users/sali/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/sali/.m2/repository/org/objenesis/objenesis/1.2/objenesis-1.2.jar:/Users/sali/.m2/repository/storm/tools.cli/0.2.2/tools.cli-0.2.2.jar:/Users/sali/.m2/repository/com/googlecode/disruptor/disruptor/2.10.1/disruptor-2.10.1.jar:/Users/sali/.m2/repository/storm/jgrapht/0.8.3/jgrapht-0.8.3.jar:/Users/sali/.m2/repository/ch/qos/logback/logback-classic/1.0.6/logback-classic-1.0.6.jar:/Users/sali/.m2/repository/ch/qos/logback/logback-core/1.0.6/logback-core-1.0.6.jar:/Users/sali/.m2/repository/org/slf4j/log4j-over-slf4j/1.6.6/log4j-over-slf4j-1.6.6.jar:/Users/sali/.m2/repository/storm/storm-netty/0.9.0.1/storm-netty-0.9.0.1.jar:/Users/sali/.m2/repository/io/netty/netty/3.6.3.Final/netty-3.6.3.Final.jar:/Users/sali/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.library.path=.:/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.io.tmpdir=/var/folders/91/j38ypw6105z8j5xrhq2t_ng00000gn/T/
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:java.compiler=<NA>
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:os.name=Mac OS X
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:os.arch=x86_64
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:os.version=10.9.4
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:user.name=sali
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:user.home=/Users/sali
[main] INFO org.apache.zookeeper.server.ZooKeeperServer - Server environment:user.dir=/workspace/tools/stormprj/kafka-producers
Exception in thread "main" java.lang.ExceptionInInitializerError
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at clojure.lang.RT.loadClassForName(RT.java:2056)
	at clojure.lang.RT.load(RT.java:419)
	at clojure.lang.RT.load(RT.java:400)
	at clojure.core$load$fn__4890.invoke(core.clj:5415)
	at clojure.core$load.doInvoke(core.clj:5414)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at clojure.core$load_one.invoke(core.clj:5227)
	at clojure.core$load_lib.doInvoke(core.clj:5264)
	at clojure.lang.RestFn.applyTo(RestFn.java:142)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$load_libs.doInvoke(core.clj:5302)
	at clojure.lang.RestFn.applyTo(RestFn.java:137)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$require.doInvoke(core.clj:5381)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at backtype.storm.cluster$loading__4784__auto__.invoke(cluster.clj:1)
	at backtype.storm.cluster__init.load(Unknown Source)
	at backtype.storm.cluster__init.<clinit>(Unknown Source)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at clojure.lang.RT.loadClassForName(RT.java:2056)
	at clojure.lang.RT.load(RT.java:419)
	at clojure.lang.RT.load(RT.java:400)
	at clojure.core$load$fn__4890.invoke(core.clj:5415)
	at clojure.core$load.doInvoke(core.clj:5414)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at clojure.core$load_one.invoke(core.clj:5227)
	at clojure.core$load_lib.doInvoke(core.clj:5264)
	at clojure.lang.RestFn.applyTo(RestFn.java:142)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$load_libs.doInvoke(core.clj:5302)
	at clojure.lang.RestFn.applyTo(RestFn.java:137)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$require.doInvoke(core.clj:5381)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at backtype.storm.daemon.nimbus__init.load(Unknown Source)
	at backtype.storm.daemon.nimbus__init.<clinit>(Unknown Source)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at clojure.lang.RT.loadClassForName(RT.java:2056)
	at clojure.lang.RT.load(RT.java:419)
	at clojure.lang.RT.load(RT.java:400)
	at clojure.core$load$fn__4890.invoke(core.clj:5415)
	at clojure.core$load.doInvoke(core.clj:5414)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at clojure.core$load_one.invoke(core.clj:5227)
	at clojure.core$load_lib.doInvoke(core.clj:5264)
	at clojure.lang.RestFn.applyTo(RestFn.java:142)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$load_libs.doInvoke(core.clj:5302)
	at clojure.lang.RestFn.applyTo(RestFn.java:137)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$require.doInvoke(core.clj:5381)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at backtype.storm.testing$loading__4784__auto__.invoke(testing.clj:1)
	at backtype.storm.testing__init.load(Unknown Source)
	at backtype.storm.testing__init.<clinit>(Unknown Source)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at clojure.lang.RT.loadClassForName(RT.java:2056)
	at clojure.lang.RT.load(RT.java:419)
	at clojure.lang.RT.load(RT.java:400)
	at clojure.core$load$fn__4890.invoke(core.clj:5415)
	at clojure.core$load.doInvoke(core.clj:5414)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at clojure.core$load_one.invoke(core.clj:5227)
	at clojure.core$load_lib.doInvoke(core.clj:5264)
	at clojure.lang.RestFn.applyTo(RestFn.java:142)
	at clojure.core$apply.invoke(core.clj:603)
	at clojure.core$load_libs.doInvoke(core.clj:5302)
	at clojure.lang.RestFn.applyTo(RestFn.java:137)
	at clojure.core$apply.invoke(core.clj:605)
	at clojure.core$use.doInvoke(core.clj:5392)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at backtype.storm.LocalCluster$loading__4784__auto__.invoke(LocalCluster.clj:1)
	at backtype.storm.LocalCluster__init.load(Unknown Source)
	at backtype.storm.LocalCluster__init.<clinit>(Unknown Source)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:249)
	at clojure.lang.RT.loadClassForName(RT.java:2056)
	at clojure.lang.RT.load(RT.java:419)
	at clojure.lang.RT.load(RT.java:400)
	at clojure.core$load$fn__4890.invoke(core.clj:5415)
	at clojure.core$load.doInvoke(core.clj:5414)
	at clojure.lang.RestFn.invoke(RestFn.java:408)
	at clojure.lang.Var.invoke(Var.java:415)
	at backtype.storm.LocalCluster.<clinit>(Unknown Source)
	at storm.ingestTopology.main(ingestTopology.java:126)
Caused by: java.lang.ClassNotFoundException: org.apache.zookeeper.server.NIOServerCnxn$Factory
	at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:306)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:171)
	at backtype.storm.zookeeper$loading__4784__auto__.invoke(zookeeper.clj:1)
	at backtype.storm.zookeeper__init.load(Unknown Source)
	at backtype.storm.zookeeper__init.<clinit>(Unknown Source)
	... 90 more

I poke around this error java.lang.ClassNotFoundException: org.apache.zookeeper.server.NIOServerCnxn$Factory, making changes on pom, seems there is no solid solution, someone suggested to downgrade the zookeeper, but I kinda do not want to do that, is there any other solution?


thanks


Alec


On Jul 7, 2014, at 5:04 PM, Sa Li <sa...@gmail.com> wrote:

> great! Robert, will start from there.
> 
> 
> On Jul 7, 2014, at 11:14 AM, Robert Lee <le...@gmail.com> wrote:
> 
>> Alec,
>> 
>> Check out the very nicely compiled storm-kafka module within storm that has been developed by wurstmeister (https://github.com/apache/incubator-storm/tree/master/external/storm-kafka). For a quick start add the following to your pom file:
>> <dependency>
>>         <artifactId>storm-kafka</artifactId>
>>         <groupId>org.apache.storm</groupId>
>>        <version>0.9.2-incubating</version>
>> </dependency>
>> 
>> And within your main code set up a kafka spout with the following code:
>> 
>> TridentTopology topology = new TridentTopology();
>> BrokerHosts zk = new ZkHosts("localhost");
>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
>> 
>> 
>> topology.newStream("kafka", spout).shuffle()
>> .each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(), new Fields("yourField"))
>> 
>> ......
>> 
>> Your first function on the topology after creating the spout stream will take the message and create whatever data field you want such that you can operate on the postgresql data.
>> 
>> Hope that gives you a quick start,
>> Robert
>> 
>> 
>> On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <sa...@gmail.com> wrote:
>> Hello, Robert
>> 
>> As you mentioned in last thread, I download your kafka stuff, that was very useful, I have already implemented a kafka producer to get data from postgresql and sending data to brokers. By checking
>> 
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ingest_test --from-beginning
>> 
>> 
>> I know consumer can receive the data. Now I like to integrate the kafka producer into storm. I try to understand the logic, so the storm spout suppose to be functional as consumer and _collector will get the data sent by kafka producer, is that right?
>> 
>> Hope there are some sample codes available to use. 
>> 
>> Thanks
>> 
>> Alec 
>> 
>> 
>> On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com> wrote:, 
>> 
>>> I always like to simplify things. If I were you, I would use the well known and used spout of kafka to ingest data into your storm cluster. Simply write a Kafka Producer that utilizes the postgre java driver to pull out your required data and send it as a message. You'll find it is pretty easy to write kafka producers. Check out my project of creating some simple producers and just mirror that to produce your postgre producer:
>>> 
>>> https://github.com/leerobert/kafka-producers 
>>> 
>>> 
>>> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:
>>> Thanks a lot, John. The entire project is getting data from postgresql and finally emit and update cassandra tables. With the help of Robert in this group, think I have some resource of storm-cassandra integration. However,  really not much tutorials regarding postgres with storm, 'storm-rdbms ‘ is the only examples I can find about db->storm. That would be great if someone can contribute more example code about posture-storm. Sorry for the shameless requirement from a new storm user.
>>> 
>>> 
>>> thanka
>>> 
>>> Alec
>>> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
>>> 
>>>> Hi 
>>>> 
>>>> We use Postgres notifications. The spout open method  registers for database notifications (add, update, delete). Each time the spout next method is called we check for pending notifications and process accordingly.
>>>> 
>>>> Good Luck
>>>> 
>>>> John 
>>>> 
>>>> 
>>>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>>>> Dear all
>>>> 
>>>> I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.
>>>> 
>>>> 
>>>> thanks
>>>> 
>>>> Alec
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 


Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
great! Robert, will start from there.


On Jul 7, 2014, at 11:14 AM, Robert Lee <le...@gmail.com> wrote:

> Alec,
> 
> Check out the very nicely compiled storm-kafka module within storm that has been developed by wurstmeister (https://github.com/apache/incubator-storm/tree/master/external/storm-kafka). For a quick start add the following to your pom file:
> <dependency>
>         <artifactId>storm-kafka</artifactId>
>         <groupId>org.apache.storm</groupId>
>        <version>0.9.2-incubating</version>
> </dependency>
> 
> And within your main code set up a kafka spout with the following code:
> 
> TridentTopology topology = new TridentTopology();
> BrokerHosts zk = new ZkHosts("localhost");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "ingest_test");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
> 
> 
> topology.newStream("kafka", spout).shuffle()
> .each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(), new Fields("yourField"))
> 
> ......
> 
> Your first function on the topology after creating the spout stream will take the message and create whatever data field you want such that you can operate on the postgresql data.
> 
> Hope that gives you a quick start,
> Robert
> 
> 
> On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <sa...@gmail.com> wrote:
> Hello, Robert
> 
> As you mentioned in last thread, I download your kafka stuff, that was very useful, I have already implemented a kafka producer to get data from postgresql and sending data to brokers. By checking
> 
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ingest_test --from-beginning
> 
> 
> I know consumer can receive the data. Now I like to integrate the kafka producer into storm. I try to understand the logic, so the storm spout suppose to be functional as consumer and _collector will get the data sent by kafka producer, is that right?
> 
> Hope there are some sample codes available to use. 
> 
> Thanks
> 
> Alec 
> 
> 
> On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com> wrote:, 
> 
>> I always like to simplify things. If I were you, I would use the well known and used spout of kafka to ingest data into your storm cluster. Simply write a Kafka Producer that utilizes the postgre java driver to pull out your required data and send it as a message. You'll find it is pretty easy to write kafka producers. Check out my project of creating some simple producers and just mirror that to produce your postgre producer:
>> 
>> https://github.com/leerobert/kafka-producers 
>> 
>> 
>> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:
>> Thanks a lot, John. The entire project is getting data from postgresql and finally emit and update cassandra tables. With the help of Robert in this group, think I have some resource of storm-cassandra integration. However,  really not much tutorials regarding postgres with storm, 'storm-rdbms ‘ is the only examples I can find about db->storm. That would be great if someone can contribute more example code about posture-storm. Sorry for the shameless requirement from a new storm user.
>> 
>> 
>> thanka
>> 
>> Alec
>> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
>> 
>>> Hi 
>>> 
>>> We use Postgres notifications. The spout open method  registers for database notifications (add, update, delete). Each time the spout next method is called we check for pending notifications and process accordingly.
>>> 
>>> Good Luck
>>> 
>>> John 
>>> 
>>> 
>>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>>> Dear all
>>> 
>>> I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.
>>> 
>>> 
>>> thanks
>>> 
>>> Alec
>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: postgresql -> spout

Posted by Robert Lee <le...@gmail.com>.
Alec,

Check out the very nicely compiled storm-kafka module within storm that has
been developed by wurstmeister (
https://github.com/apache/incubator-storm/tree/master/external/storm-kafka).
For a quick start add the following to your pom file:
<dependency>
        <artifactId>storm-kafka</artifactId>
        <groupId>org.apache.storm</groupId>
       <version>0.9.2-incubating</version>
</dependency>

And within your main code set up a kafka spout with the following code:

TridentTopology topology = new TridentTopology();BrokerHosts zk = new
ZkHosts("localhost");TridentKafkaConfig spoutConf = new
TridentKafkaConfig(zk, "ingest_test");spoutConf.scheme = new
SchemeAsMultiScheme(new StringScheme());OpaqueTridentKafkaSpout spout
= new OpaqueTridentKafkaSpout(spoutConf);

topology.newStream("kafka", spout).shuffle()

.each(new Fields("str"), new FunctionThatWorksOnKafkaOutputMessage(),
new Fields("yourField"))
......


Your first function on the topology after creating the spout stream will
take the message and create whatever data field you want such that you can
operate on the postgresql data.

Hope that gives you a quick start,
Robert


On Mon, Jul 7, 2014 at 1:40 PM, Sa Li <sa...@gmail.com> wrote:

> Hello, Robert
>
> As you mentioned in last thread, I download your kafka stuff, that was
> very useful, I have already implemented a kafka producer to get data from
> postgresql and sending data to brokers. By checking
>
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
> ingest_test --from-beginning
>
>
> I know consumer can receive the data. Now I like to integrate the kafka
> producer into storm. I try to understand the logic, so the storm spout
> suppose to be functional as consumer and _collector will get the data sent
> by kafka producer, is that right?
>
> Hope there are some sample codes available to use.
>
> Thanks
>
> Alec
>
>
> On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com>
> wrote:,
>
> I always like to simplify things. If I were you, I would use the well
> known and used spout of kafka to ingest data into your storm cluster.
> Simply write a Kafka Producer that utilizes the postgre java driver to pull
> out your required data and send it as a message. You'll find it is pretty
> easy to write kafka producers. Check out my project of creating some simple
> producers and just mirror that to produce your postgre producer:
>
> https://github.com/leerobert/kafka-producers
>
>
> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:
>
>> Thanks a lot, John. The entire project is getting data from postgresql
>> and finally emit and update cassandra tables. With the help of Robert in
>> this group, think I have some resource of storm-cassandra integration.
>> However,  really not much tutorials regarding postgres with storm, '
>> *storm-rdbms* ‘ is the only examples I can find about db->storm. That
>> would be great if someone can contribute more example code about
>> posture-storm. Sorry for the shameless requirement from a new storm user.
>>
>>
>> thanka
>>
>> Alec
>> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
>>
>> Hi
>>
>> We use Postgres notifications. The spout open method  registers for
>> database notifications (add, update, delete). Each time the spout next
>> method is called we check for pending notifications and process accordingly.
>>
>> Good Luck
>>
>> John
>>
>>
>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>>
>>> Dear all
>>>
>>> I am doing an implementation of spout, the stream of is coming from a
>>> postgresql ingress API (in-house project).  All I know for now is to get
>>> spout connected to postgresl, and retrieve the data periodically and store
>>> the data to a queue and then emits to the topology.  Anyone has ever done
>>> the similar job, hope to get some instructions and details from you.
>>>
>>>
>>> thanks
>>>
>>> Alec
>>>
>>>
>>>
>>
>>
>
>

Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
Hello, Robert

As you mentioned in last thread, I download your kafka stuff, that was very useful, I have already implemented a kafka producer to get data from postgresql and sending data to brokers. By checking

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ingest_test --from-beginning


I know consumer can receive the data. Now I like to integrate the kafka producer into storm. I try to understand the logic, so the storm spout suppose to be functional as consumer and _collector will get the data sent by kafka producer, is that right?

Hope there are some sample codes available to use. 

Thanks

Alec 


On Jun 27, 2014, at 11:58 AM, Robert Lee <le...@gmail.com> wrote:, 

> I always like to simplify things. If I were you, I would use the well known and used spout of kafka to ingest data into your storm cluster. Simply write a Kafka Producer that utilizes the postgre java driver to pull out your required data and send it as a message. You'll find it is pretty easy to write kafka producers. Check out my project of creating some simple producers and just mirror that to produce your postgre producer:
> 
> https://github.com/leerobert/kafka-producers 
> 
> 
> On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:
> Thanks a lot, John. The entire project is getting data from postgresql and finally emit and update cassandra tables. With the help of Robert in this group, think I have some resource of storm-cassandra integration. However,  really not much tutorials regarding postgres with storm, 'storm-rdbms ‘ is the only examples I can find about db->storm. That would be great if someone can contribute more example code about posture-storm. Sorry for the shameless requirement from a new storm user.
> 
> 
> thanka
> 
> Alec
> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
> 
>> Hi 
>> 
>> We use Postgres notifications. The spout open method  registers for database notifications (add, update, delete). Each time the spout next method is called we check for pending notifications and process accordingly.
>> 
>> Good Luck
>> 
>> John 
>> 
>> 
>> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>> Dear all
>> 
>> I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.
>> 
>> 
>> thanks
>> 
>> Alec
>> 
>> 
>> 
> 
> 


Re: postgresql -> spout

Posted by Robert Lee <le...@gmail.com>.
I always like to simplify things. If I were you, I would use the well known
and used spout of kafka to ingest data into your storm cluster. Simply
write a Kafka Producer that utilizes the postgre java driver to pull out
your required data and send it as a message. You'll find it is pretty easy
to write kafka producers. Check out my project of creating some simple
producers and just mirror that to produce your postgre producer:

https://github.com/leerobert/kafka-producers


On Fri, Jun 27, 2014 at 2:49 PM, Sa Li <sa...@gmail.com> wrote:

> Thanks a lot, John. The entire project is getting data from postgresql and
> finally emit and update cassandra tables. With the help of Robert in this
> group, think I have some resource of storm-cassandra integration. However,
>  really not much tutorials regarding postgres with storm, '*storm-rdbms* ‘
> is the only examples I can find about db->storm. That would be great if
> someone can contribute more example code about posture-storm. Sorry for the
> shameless requirement from a new storm user.
>
>
> thanka
>
> Alec
> On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:
>
> Hi
>
> We use Postgres notifications. The spout open method  registers for
> database notifications (add, update, delete). Each time the spout next
> method is called we check for pending notifications and process accordingly.
>
> Good Luck
>
> John
>
>
> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
>
>> Dear all
>>
>> I am doing an implementation of spout, the stream of is coming from a
>> postgresql ingress API (in-house project).  All I know for now is to get
>> spout connected to postgresl, and retrieve the data periodically and store
>> the data to a queue and then emits to the topology.  Anyone has ever done
>> the similar job, hope to get some instructions and details from you.
>>
>>
>> thanks
>>
>> Alec
>>
>>
>>
>
>

Re: postgresql -> spout

Posted by Sa Li <sa...@gmail.com>.
Thanks a lot, John. The entire project is getting data from postgresql and finally emit and update cassandra tables. With the help of Robert in this group, think I have some resource of storm-cassandra integration. However,  really not much tutorials regarding postgres with storm, 'storm-rdbms ‘ is the only examples I can find about db->storm. That would be great if someone can contribute more example code about posture-storm. Sorry for the shameless requirement from a new storm user.


thanka

Alec
On Jun 27, 2014, at 5:53 AM, John Welcher <jp...@gmail.com> wrote:

> Hi 
> 
> We use Postgres notifications. The spout open method  registers for database notifications (add, update, delete). Each time the spout next method is called we check for pending notifications and process accordingly.
> 
> Good Luck
> 
> John 
> 
> 
> On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:
> Dear all
> 
> I am doing an implementation of spout, the stream of is coming from a postgresql ingress API (in-house project).  All I know for now is to get spout connected to postgresl, and retrieve the data periodically and store the data to a queue and then emits to the topology.  Anyone has ever done the similar job, hope to get some instructions and details from you.
> 
> 
> thanks
> 
> Alec
> 
> 
> 


Re: postgresql -> spout

Posted by John Welcher <jp...@gmail.com>.
Hi

We use Postgres notifications. The spout open method  registers for
database notifications (add, update, delete). Each time the spout next
method is called we check for pending notifications and process accordingly.

Good Luck

John


On Fri, Jun 27, 2014 at 12:07 AM, Sa Li <sa...@gmail.com> wrote:

> Dear all
>
> I am doing an implementation of spout, the stream of is coming from a
> postgresql ingress API (in-house project).  All I know for now is to get
> spout connected to postgresl, and retrieve the data periodically and store
> the data to a queue and then emits to the topology.  Anyone has ever done
> the similar job, hope to get some instructions and details from you.
>
>
> thanks
>
> Alec
>
>
>