You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@eagle.apache.org by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com> on 2015/12/29 03:26:55 UTC

[BUG]UnionForAlert test case fail

I found TestStormRunner::UnionForAlert fail with type incompatibility, I think that is because Hao has refactored processing layer, but we should fix those unit test cases.


object UnionForAlert extends App{
  val config : Config = ConfigFactory.load;
  val env = StormExecutionEnvironment(config)
  val tail1 = env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).map2(a => ("key1",a))
  val tail2 = env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map2(a => ("key2",a))
  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume = false)
  env.execute()
}

2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9] storm.AbstractStreamBolt[98]: Got exception when processing source: WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1, {word=test_abc}]
java.lang.ClassCastException: scala.Tuple2 cannot be cast to org.apache.eagle.datastream.Tuple2
at org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner.scala:34)
at org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(StreamAlertExpansion.scala:159)
at org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.scala:56)
at org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStreamBolt.scala:93)
at backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(executor.clj:633)
at backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.clj:401)
at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58)
at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
at clojure.lang.AFn.run(AFn.java:24)
at java.lang.Thread.run(Thread.java:745)

thanks
Edward

Re: [BUG]UnionForAlert test case fail

Posted by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>.
Hao, is this because of your new code on dsl? why do we keep eagle.Tuple2
if we already use scala.Tuple2?

Thanks
Edward

On 12/28/15, 18:26, "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com> wrote:

>I found TestStormRunner::UnionForAlert fail with type incompatibility, I
>think that is because Hao has refactored processing layer, but we should
>fix those unit test cases.
>
>
>object UnionForAlert extends App{
>  val config : Config = ConfigFactory.load;
>  val env = StormExecutionEnvironment(config)
>  val tail1 = 
>env.fromSpout(TestSpout()).flatMap(WordPrependForAlertExecutor("test")).ma
>p2(a => ("key1",a))
>  val tail2 = 
>env.fromSpout(TestSpout()).flatMap(WordAppendForAlertExecutor("test")).map
>2(a => ("key2",a))
>  tail1.streamUnion(List(tail2)).alert(Seq("s1","s2"), "alert1", consume
>= false)
>  env.execute()
>}
>
>2015-12-28 18:19:17,940 ERROR [Thread-8-MapperProducer_9]
>storm.AbstractStreamBolt[98]: Got exception when processing source:
>WordPrependForAlertExecutor(test)_2:6, stream: default, id: {}, [key1,
>{word=test_abc}]
>java.lang.ClassCastException: scala.Tuple2 cannot be cast to
>org.apache.eagle.datastream.Tuple2
>at 
>org.apache.eagle.datastream.UnionForAlert$$anonfun$1.apply(TestStormRunner
>.scala:34)
>at 
>org.apache.eagle.datastream.core.StreamAlertExpansion$$anonfun$1.apply(Str
>eamAlertExpansion.scala:159)
>at 
>org.apache.eagle.datastream.storm.MapBoltWrapper.onValues(MapBoltWrapper.s
>cala:56)
>at 
>org.apache.eagle.datastream.storm.AbstractStreamBolt.execute(AbstractStrea
>mBolt.scala:93)
>at 
>backtype.storm.daemon.executor$fn__3441$tuple_action_fn__3443.invoke(execu
>tor.clj:633)
>at 
>backtype.storm.daemon.executor$mk_task_receiver$fn__3364.invoke(executor.c
>lj:401)
>at 
>backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj
>:58)
>at 
>backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.ja
>va:125)
>at 
>backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQue
>ue.java:99)
>at 
>backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj
>:80)
>at 
>backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.
>clj:748)
>at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
>at clojure.lang.AFn.run(AFn.java:24)
>at java.lang.Thread.run(Thread.java:745)
>
>thanks
>Edward