You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/11/07 00:38:33 UTC

how to consume data from beginning by using TridentKafkaConfig

Hi, All

I am using TridentKafkaConfig to consume data, I was not setting

spoutConf.forceFromStart = true;

so I am consuming the data from the latest time I assume, and it works.
However, if I
set spoutConf.forceFromStart = true; spout just consumes nothing, I thought
it should consume the data from the beginning of the Kafka stream.

Here is the code:

  BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
//              TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"PofApiTest");
                TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"topictest");
                spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());

                spoutConf.fetchSizeBytes = 50*1024*1024;
                spoutConf.bufferSizeBytes = 50*1024*1024;
//                spoutConf.startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
//                spoutConf.startOffsetTime =
kafka.api.OffsetRequest.LatestTime();
//                spoutConf.socketTimeoutMs = 1000;
//                spoutConf.fetchMaxWait = 1000;
               spoutConf.forceFromStart = true;
//                spoutConf.maxOffsetBehind = Long.MAX_VALUE;
//                spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
//                spoutConf.metricsTimeBucketSizeInSecs = 600;

                OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(spoutConf);
//                TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(spoutConf);

                TridentTopology topology = new TridentTopology();


 topology.newStream("topictestspout", kafkaSpout)
//                  topology.newStream("test", new RandomTupleSpout())
   // this test tells the kafkaSpout has the overhead to cause the latency
                                      .parallelismHint(4)
//                                      .shuffle()
//                                      .each(new Fields("batchid","word"),
                                      .each(new Fields("str"),
                                            new JsonObjectParse(),
                                            new Fields("userid","event"))
                                      .groupBy(new Fields("userid"))

.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
//                                      .parallelismHint(6);


Can anyone tell me why I can't consume the data from beginning?

thanks


Alec

Re: how to consume data from beginning by using TridentKafkaConfig

Posted by Andrew Neilson <ar...@gmail.com>.
you also need to set the starting offset time. So to do what you want, you
would need to uncomment this line:

// spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

I can definitely suggest looking at the storm-kafka source code to see how
forceFromStart gets used

On Thu, Nov 6, 2014 at 3:38 PM, Sa Li <sa...@gmail.com> wrote:

> Hi, All
>
> I am using TridentKafkaConfig to consume data, I was not setting
>
> spoutConf.forceFromStart = true;
>
> so I am consuming the data from the latest time I assume, and it works.
> However, if I
> set spoutConf.forceFromStart = true; spout just consumes nothing, I
> thought it should consume the data from the beginning of the Kafka stream.
>
> Here is the code:
>
>   BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> //              TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "PofApiTest");
>                 TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "topictest");
>                 spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
>
>                 spoutConf.fetchSizeBytes = 50*1024*1024;
>                 spoutConf.bufferSizeBytes = 50*1024*1024;
> //                spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.EarliestTime();
> //                spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.LatestTime();
> //                spoutConf.socketTimeoutMs = 1000;
> //                spoutConf.fetchMaxWait = 1000;
>                spoutConf.forceFromStart = true;
> //                spoutConf.maxOffsetBehind = Long.MAX_VALUE;
> //                spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
> //                spoutConf.metricsTimeBucketSizeInSecs = 600;
>
>                 OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> //                TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(spoutConf);
>
>                 TridentTopology topology = new TridentTopology();
>
>
>  topology.newStream("topictestspout", kafkaSpout)
> //                  topology.newStream("test", new RandomTupleSpout())
>    // this test tells the kafkaSpout has the overhead to cause the latency
>                                       .parallelismHint(4)
> //                                      .shuffle()
> //                                      .each(new Fields("batchid","word"),
>                                       .each(new Fields("str"),
>                                             new JsonObjectParse(),
>                                             new Fields("userid","event"))
>                                       .groupBy(new Fields("userid"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
> //                                      .parallelismHint(6);
>
>
> Can anyone tell me why I can't consume the data from beginning?
>
> thanks
>
>
> Alec
>