You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Sa Li <sa...@gmail.com> on 2014/11/07 00:38:33 UTC
how to consume data from beginning by using TridentKafkaConfig
Hi, All
I am using TridentKafkaConfig to consume data, I was not setting
spoutConf.forceFromStart = true;
so I am consuming the data from the latest time I assume, and it works.
However, if I
set spoutConf.forceFromStart = true; spout just consumes nothing, I thought
it should consume the data from the beginning of the Kafka stream.
Here is the code:
BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
// TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"PofApiTest");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
"topictest");
spoutConf.scheme = new SchemeAsMultiScheme(new
StringScheme());
spoutConf.fetchSizeBytes = 50*1024*1024;
spoutConf.bufferSizeBytes = 50*1024*1024;
// spoutConf.startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();
// spoutConf.startOffsetTime =
kafka.api.OffsetRequest.LatestTime();
// spoutConf.socketTimeoutMs = 1000;
// spoutConf.fetchMaxWait = 1000;
spoutConf.forceFromStart = true;
// spoutConf.maxOffsetBehind = Long.MAX_VALUE;
// spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
// spoutConf.metricsTimeBucketSizeInSecs = 600;
OpaqueTridentKafkaSpout kafkaSpout = new
OpaqueTridentKafkaSpout(spoutConf);
// TransactionalTridentKafkaSpout kafkaSpout = new
TransactionalTridentKafkaSpout(spoutConf);
TridentTopology topology = new TridentTopology();
topology.newStream("topictestspout", kafkaSpout)
// topology.newStream("test", new RandomTupleSpout())
// this test tells the kafkaSpout has the overhead to cause the latency
.parallelismHint(4)
// .shuffle()
// .each(new Fields("batchid","word"),
.each(new Fields("str"),
new JsonObjectParse(),
new Fields("userid","event"))
.groupBy(new Fields("userid"))
.persistentAggregate(PostgresqlState.newFactory(config), new
Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
// .parallelismHint(6);
Can anyone tell me why I can't consume the data from beginning?
thanks
Alec
Re: how to consume data from beginning by using TridentKafkaConfig
Posted by Andrew Neilson <ar...@gmail.com>.
you also need to set the starting offset time. So to do what you want, you
would need to uncomment this line:
// spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
I can definitely suggest looking at the storm-kafka source code to see how
forceFromStart gets used
On Thu, Nov 6, 2014 at 3:38 PM, Sa Li <sa...@gmail.com> wrote:
> Hi, All
>
> I am using TridentKafkaConfig to consume data, I was not setting
>
> spoutConf.forceFromStart = true;
>
> so I am consuming the data from the latest time I assume, and it works.
> However, if I
> set spoutConf.forceFromStart = true; spout just consumes nothing, I
> thought it should consume the data from the beginning of the Kafka stream.
>
> Here is the code:
>
> BrokerHosts zk = new ZkHosts("10.100.70.128:2181");
> // TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "PofApiTest");
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk,
> "topictest");
> spoutConf.scheme = new SchemeAsMultiScheme(new
> StringScheme());
>
> spoutConf.fetchSizeBytes = 50*1024*1024;
> spoutConf.bufferSizeBytes = 50*1024*1024;
> // spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.EarliestTime();
> // spoutConf.startOffsetTime =
> kafka.api.OffsetRequest.LatestTime();
> // spoutConf.socketTimeoutMs = 1000;
> // spoutConf.fetchMaxWait = 1000;
> spoutConf.forceFromStart = true;
> // spoutConf.maxOffsetBehind = Long.MAX_VALUE;
> // spoutConf.useStartOffsetTimeIfOffsetOutOfRange = true;
> // spoutConf.metricsTimeBucketSizeInSecs = 600;
>
> OpaqueTridentKafkaSpout kafkaSpout = new
> OpaqueTridentKafkaSpout(spoutConf);
> // TransactionalTridentKafkaSpout kafkaSpout = new
> TransactionalTridentKafkaSpout(spoutConf);
>
> TridentTopology topology = new TridentTopology();
>
>
> topology.newStream("topictestspout", kafkaSpout)
> // topology.newStream("test", new RandomTupleSpout())
> // this test tells the kafkaSpout has the overhead to cause the latency
> .parallelismHint(4)
> // .shuffle()
> // .each(new Fields("batchid","word"),
> .each(new Fields("str"),
> new JsonObjectParse(),
> new Fields("userid","event"))
> .groupBy(new Fields("userid"))
>
> .persistentAggregate(PostgresqlState.newFactory(config), new
> Fields("userid","event"), new EventUpdater(), new Fields( "eventword"));
> // .parallelismHint(6);
>
>
> Can anyone tell me why I can't consume the data from beginning?
>
> thanks
>
>
> Alec
>