You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@storm.apache.org by Nilesh Chhapru <ni...@ugamsolutions.com> on 2015/08/05 16:53:41 UTC

Storm Trident - persistentAggregate With HBase 0.98

Hi All,

I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, which resulted compilation errors in storm trident topology.

I was using following code to aggregate and store the state to HBase (reference : https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java)

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout).each(new Fields("shortid", "date"), new DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", "cq")).persistentAggregate(state, new Count(), new Fields("count"));

Now since i have upgraded the HBase i realized that come of the methods have undergone change hence moved to following code
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                    .withColumnFamily("RKCSV")
                    .withColumnFields(new Fields("cq"))
                    .withCounterFields(new Fields("value1"))
                    .withRowKeyField("rowKey");

HBaseState.Options options = new HBaseState.Options()
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withTableName("errSmryTbl_trident");

StateFactory factory = new HBaseStateFactory(options);
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts(zkHost);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Stream kafkaStream = topology.newStream("kafka", spout).shuffle();
Stream boltStream1 = kafkaStream.each(new Fields("str"), new InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", "cq"));
groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new Fields("value"));

But now i an facing class cast exception java.lang.RuntimeException: java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState cannot be cast to storm.trident.state.map.MapState

is there anything i m missing here? the aggregator that i m using is "CombinerAggregator<List<Map<String, String>>>".

Regards,
Nilesh Chhapru.

Re: Storm Trident - persistentAggregate With HBase 0.98

Posted by Nilesh Chhapru <ni...@ugamsolutions.com>.
Hi Jungtaek,

When i use "HBaseMapState and HBaseMapState.Options" i need to specify the "qualifier" field of options, but that is dynamic and coming from one of the Field of a bolt stream.

Is there a way i can specify the Field when i am using "HBaseMapState".

Regards,
Nilesh Chhapru.

On Thursday 06 August 2015 01:24 PM, 임정택 wrote:
I'm sorry I misunderstood you.

persistentAggregate handles grouped stream, so you need to use HBaseMapState and HBaseMapState.Options instead of HBaseState and HBaseState.Options.

You can refer two links for more details.
a) http://storm.apache.org/documentation/Trident-API-Overview.html - "Operations on grouped streams" section
b) http://storm.apache.org/documentation/Trident-state.html - "persistentAggregate" section

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-06 16:15 GMT+09:00 Nilesh Chhapru <ni...@ugamsolutions.com>>:
Hi ,

I have implemented the same "ptgoetz<https://github.com/ptgoetz>" code which is giving me the exception.

Do you or anyone have used the persistanceAggregate with trident using new version of HBase.

Regards,
Nilesh Chhapru.


On Thursday 06 August 2015 03:35 AM, 임정택 wrote:
Hi,

I didn't look into it, but it seems to use old or unmanaged module.
Official extern module for HBase is here, https://github.com/apache/storm/tree/master/external/storm-hbase.
It has been released with Storm 0.9.3 and onwards.

Please refer its README.md on Github repository, and below link to see all versions.
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22

Hope this helps.

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <ni...@ugamsolutions.com>>:
Hi All,

I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, which resulted compilation errors in storm trident topology.

I was using following code to aggregate and store the state to HBase (reference : https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java)

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout).each(new Fields("shortid", "date"), new DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", "cq")).persistentAggregate(state, new Count(), new Fields("count"));

Now since i have upgraded the HBase i realized that come of the methods have undergone change hence moved to following code
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                    .withColumnFamily("RKCSV")
                    .withColumnFields(new Fields("cq"))
                    .withCounterFields(new Fields("value1"))
                    .withRowKeyField("rowKey");

HBaseState.Options options = new HBaseState.Options()
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withTableName("errSmryTbl_trident");

StateFactory factory = new HBaseStateFactory(options);
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts(zkHost);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Stream kafkaStream = topology.newStream("kafka", spout).shuffle();
Stream boltStream1 = kafkaStream.each(new Fields("str"), new InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", "cq"));
groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new Fields("value"));

But now i an facing class cast exception java.lang.RuntimeException: java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState cannot be cast to storm.trident.state.map.MapState

is there anything i m missing here? the aggregator that i m using is "CombinerAggregator<List<Map<String, String>>>".

Regards,
Nilesh Chhapru.



--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior




--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Storm Trident - persistentAggregate With HBase 0.98

Posted by 임정택 <ka...@gmail.com>.
I'm sorry I misunderstood you.

persistentAggregate handles grouped stream, so you need to use
HBaseMapState and HBaseMapState.Options instead of HBaseState and
HBaseState.Options.

You can refer two links for more details.
a) http://storm.apache.org/documentation/Trident-API-Overview.html -
"Operations on grouped streams" section
b) http://storm.apache.org/documentation/Trident-state.html -
"persistentAggregate" section

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-06 16:15 GMT+09:00 Nilesh Chhapru <ni...@ugamsolutions.com>
:

> Hi ,
>
> I have implemented the same "ptgoetz <https://github.com/ptgoetz>" code
> which is giving me the exception.
>
> Do you or anyone have used the persistanceAggregate with trident using new
> version of HBase.
>
> Regards,
> Nilesh Chhapru.
>
>
> On Thursday 06 August 2015 03:35 AM, 임정택 wrote:
>
> Hi,
>
> I didn't look into it, but it seems to use old or unmanaged module.
> Official extern module for HBase is here,
> https://github.com/apache/storm/tree/master/external/storm-hbase.
> It has been released with Storm 0.9.3 and onwards.
>
> Please refer its README.md on Github repository, and below link to see all
> versions.
>
> http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22
>
> Hope this helps.
>
> Regards,
> Jungtaek Lim (HeartSaVioR)
>
>
> 2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <
> nilesh.chhapru@ugamsolutions.com>:
>
>> Hi All,
>>
>> I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 /
>> Hadoop2, which resulted compilation errors in storm trident topology.
>>
>> I was using following code to aggregate and store the state to HBase (*reference
>> :
>> https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java
>> <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>*
>> )
>>
>> *TridentConfig config = new TridentConfig("shorturl", "shortid");*
>> *config.setBatch(false);*
>> *StateFactory state = HBaseAggregateState.transactional(config);*
>> *TridentTopology topology = new TridentTopology();*
>> *topology.newStream("spout", spout).each(new Fields("shortid", "date"),
>> new DatePartitionFunction(),new Fields("cf", "cq")).project(new
>> Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf",
>> "cq")).persistentAggregate(state, new Count(), new Fields("count"));*
>>
>> Now since i have upgraded the HBase i realized that come of the methods
>> have undergone change hence moved to following code
>> *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()*
>> *                    .withColumnFamily("RKCSV")*
>> *                    .withColumnFields(new Fields("cq"))*
>> *                    .withCounterFields(new Fields("value1"))*
>> *                    .withRowKeyField("rowKey");*
>>
>> *HBaseState.Options options = new HBaseState.Options()*
>> *                .withDurability(Durability.SYNC_WAL)*
>> *                .withMapper(tridentHBaseMapper)*
>> *                .withTableName("errSmryTbl_trident");*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *StateFactory factory = new HBaseStateFactory(options); TridentTopology
>> topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost);
>> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
>> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
>> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
>> Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream
>> boltStream1 = kafkaStream.each(new Fields("str"), new
>> InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
>> GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey",
>> "cf", "cq")); groupedStream1.persistentAggregate(factory, new
>> Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new
>> Fields("value"));         *
>> But now i an facing class cast exception
>>
>> *java.lang.RuntimeException: java.lang.ClassCastException:
>> org.apache.storm.hbase.trident.state.HBaseState cannot be cast to
>> storm.trident.state.map.MapState *is there anything i m missing here?
>> the aggregator that i m using is "*CombinerAggregator<List<Map<String,
>> String>>>*".
>>
>> Regards,
>> Nilesh Chhapru.
>>
>
>
>
> --
> Name : 임 정택
> Blog : http://www.heartsavior.net / http://dev.heartsavior.net
> Twitter : http://twitter.com/heartsavior
> LinkedIn : http://www.linkedin.com/in/heartsavior
>
>
>


-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Re: Storm Trident - persistentAggregate With HBase 0.98

Posted by Nilesh Chhapru <ni...@ugamsolutions.com>.
Hi ,

I have implemented the same "ptgoetz<https://github.com/ptgoetz>" code which is giving me the exception.

Do you or anyone have used the persistanceAggregate with trident using new version of HBase.

Regards,
Nilesh Chhapru.

On Thursday 06 August 2015 03:35 AM, 임정택 wrote:
Hi,

I didn't look into it, but it seems to use old or unmanaged module.
Official extern module for HBase is here, https://github.com/apache/storm/tree/master/external/storm-hbase.
It has been released with Storm 0.9.3 and onwards.

Please refer its README.md on Github repository, and below link to see all versions.
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22

Hope this helps.

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <ni...@ugamsolutions.com>>:
Hi All,

I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 / Hadoop2, which resulted compilation errors in storm trident topology.

I was using following code to aggregate and store the state to HBase (reference : https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java)

TridentConfig config = new TridentConfig("shorturl", "shortid");
config.setBatch(false);
StateFactory state = HBaseAggregateState.transactional(config);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout).each(new Fields("shortid", "date"), new DatePartitionFunction(),new Fields("cf", "cq")).project(new Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf", "cq")).persistentAggregate(state, new Count(), new Fields("count"));

Now since i have upgraded the HBase i realized that come of the methods have undergone change hence moved to following code
TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()
                    .withColumnFamily("RKCSV")
                    .withColumnFields(new Fields("cq"))
                    .withCounterFields(new Fields("value1"))
                    .withRowKeyField("rowKey");

HBaseState.Options options = new HBaseState.Options()
                .withDurability(Durability.SYNC_WAL)
                .withMapper(tridentHBaseMapper)
                .withTableName("errSmryTbl_trident");

StateFactory factory = new HBaseStateFactory(options);
TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts(zkHost);
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Stream kafkaStream = topology.newStream("kafka", spout).shuffle();
Stream boltStream1 = kafkaStream.each(new Fields("str"), new InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey", "cf", "cq"));
groupedStream1.persistentAggregate(factory, new Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new Fields("value"));

But now i an facing class cast exception java.lang.RuntimeException: java.lang.ClassCastException: org.apache.storm.hbase.trident.state.HBaseState cannot be cast to storm.trident.state.map.MapState

is there anything i m missing here? the aggregator that i m using is "CombinerAggregator<List<Map<String, String>>>".

Regards,
Nilesh Chhapru.



--
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior


Re: Storm Trident - persistentAggregate With HBase 0.98

Posted by 임정택 <ka...@gmail.com>.
Hi,

I didn't look into it, but it seems to use old or unmanaged module.
Official extern module for HBase is here,
https://github.com/apache/storm/tree/master/external/storm-hbase.
It has been released with Storm 0.9.3 and onwards.

Please refer its README.md on Github repository, and below link to see all
versions.
http://search.maven.org/#search%7Cgav%7C1%7Cg%3A%22org.apache.storm%22%20AND%20a%3A%22storm-hbase%22

Hope this helps.

Regards,
Jungtaek Lim (HeartSaVioR)


2015-08-05 23:53 GMT+09:00 Nilesh Chhapru <ni...@ugamsolutions.com>
:

> Hi All,
>
> I have recently upgraded from HBase - 0.94 / Hadoop1 to HBase 0.98 /
> Hadoop2, which resulted compilation errors in storm trident topology.
>
> I was using following code to aggregate and store the state to HBase (*reference
> :
> https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java
> <https://github.com/jrkinley/storm-hbase/blob/master/src/main/java/backtype/storm/contrib/hbase/examples/HBaseTridentAggregateTopology.java>*
> )
>
> *TridentConfig config = new TridentConfig("shorturl", "shortid");*
> *config.setBatch(false);*
> *StateFactory state = HBaseAggregateState.transactional(config);*
> *TridentTopology topology = new TridentTopology();*
> *topology.newStream("spout", spout).each(new Fields("shortid", "date"),
> new DatePartitionFunction(),new Fields("cf", "cq")).project(new
> Fields("shortid", "cf", "cq")).groupBy(new Fields("shortid", "cf",
> "cq")).persistentAggregate(state, new Count(), new Fields("count"));*
>
> Now since i have upgraded the HBase i realized that come of the methods
> have undergone change hence moved to following code
> *TridentHBaseMapper tridentHBaseMapper = new SimpleTridentHBaseMapper()*
> *                    .withColumnFamily("RKCSV")*
> *                    .withColumnFields(new Fields("cq"))*
> *                    .withCounterFields(new Fields("value1"))*
> *                    .withRowKeyField("rowKey");*
>
> *HBaseState.Options options = new HBaseState.Options()*
> *                .withDurability(Durability.SYNC_WAL)*
> *                .withMapper(tridentHBaseMapper)*
> *                .withTableName("errSmryTbl_trident");*
>
>
>
>
>
>
>
>
>
>
>
> *StateFactory factory = new HBaseStateFactory(options); TridentTopology
> topology = new TridentTopology(); BrokerHosts zk = new ZkHosts(zkHost);
> TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test.topic");
> spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
> OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
> Stream kafkaStream = topology.newStream("kafka", spout).shuffle(); Stream
> boltStream1 = kafkaStream.each(new Fields("str"), new
> InitializerBolt("RKCSV"), new Fields("rowKey", "cf", "cq", "value1"));
> GroupedStream groupedStream1 = boltStream1.groupBy(new Fields("rowKey",
> "cf", "cq")); groupedStream1.persistentAggregate(factory, new
> Fields("rowKey", "cf", "cq", "value1"), new ErrorInitBolt(), new
> Fields("value"));         *
> But now i an facing class cast exception
>
> *java.lang.RuntimeException: java.lang.ClassCastException:
> org.apache.storm.hbase.trident.state.HBaseState cannot be cast to
> storm.trident.state.map.MapState *is there anything i m missing here? the
> aggregator that i m using is "*CombinerAggregator<List<Map<String,
> String>>>*".
>
> Regards,
> Nilesh Chhapru.
>



-- 
Name : 임 정택
Blog : http://www.heartsavior.net / http://dev.heartsavior.net
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior