You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by "Jake Maes (JIRA)" <ji...@apache.org> on 2018/01/12 19:27:05 UTC

[jira] [Updated] (SAMZA-1199) ClassCastException in partitionBy if input type M does not equal msg.serde type

     [ https://issues.apache.org/jira/browse/SAMZA-1199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jake Maes updated SAMZA-1199:
-----------------------------
    Fix Version/s:     (was: 0.14.0)
                   0.15.0

> ClassCastException in partitionBy if input type M does not equal msg.serde type
> -------------------------------------------------------------------------------
>
>                 Key: SAMZA-1199
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1199
>             Project: Samza
>          Issue Type: Bug
>    Affects Versions: 0.13.0
>            Reporter: Prateek Maheshwari
>            Assignee: Prateek Maheshwari
>             Fix For: 0.15.0
>
>
> {code}
> 2017-04-09 19:05:53.269 [main] SamzaContainer [ERROR] Caught exception/error in process loop.
> org.apache.samza.SamzaException: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.company.events.PageViewEvent
> 	at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
> 	at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:667)
> 	at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:81)
> 	at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:125)
> Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.linkedin.events.PageViewEvent
> 	at org.apache.samza.operators.spec.OperatorSpecs$1$1.<init>(OperatorSpecs.java:62)
> 	at org.apache.samza.operators.spec.OperatorSpecs$1.apply(OperatorSpecs.java:60)
> 	at org.apache.samza.operators.impl.StreamOperatorImpl.onNext(StreamOperatorImpl.java:47)
> 	at org.apache.samza.operators.impl.OperatorImpl.lambda$propagateResult$1(OperatorImpl.java:86)
> 	at java.lang.Iterable.forEach(Iterable.java:75)
> 	at org.apache.samza.operators.impl.OperatorImpl.propagateResult(OperatorImpl.java:86)
> 	at org.apache.samza.operators.impl.RootOperatorImpl.onNext(RootOperatorImpl.java:33)
> 	at org.apache.samza.task.StreamOperatorTask.process(StreamOperatorTask.java:122)
> 	at org.apache.samza.task.AsyncStreamTaskAdapter.process(AsyncStreamTaskAdapter.java:72)
> 	at org.apache.samza.task.AsyncStreamTaskAdapter.processAsync(AsyncStreamTaskAdapter.java:63)
> 	at org.apache.samza.container.TaskInstance$$anonfun$process$1.apply$mcV$sp(TaskInstance.scala:162)
> 	at org.apache.samza.container.TaskInstanceExceptionHandler.maybeHandle(TaskInstanceExceptionHandler.scala:54)
> 	at org.apache.samza.container.TaskInstance.process(TaskInstance.scala:160)
> 	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.process(AsyncRunLoop.java:428)
> 	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.run(AsyncRunLoop.java:373)
> 	at org.apache.samza.task.AsyncRunLoop$AsyncTaskWorker.access$300(AsyncRunLoop.java:314)
> 	at org.apache.samza.task.AsyncRunLoop.runTasks(AsyncRunLoop.java:228)
> 	at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:157)
> 	... 3 more
> {code}
> For example, code similar to the following will compile correctly but fail at runtime:
> {code}
>   public class TestStreamApp implements StreamApplication {

>     @Override

>     public void init(StreamGraph streamGraph, Config config) {

>       MessageStream<PageViewEvent> pageViewEvents =
>           streamGraph.getInputStream("PageViewEvent",

>             (k, m) -> AvroUtils.genericRecordToSpecificRecord(new PageViewEvent(), (GenericData.Record) m));


>       
>       pageViewEvents
>           .
partitionBy(this::getMemberId)
>           .map(this::setTrackingCode) // throws ClassCastException since incoming GenericData.Record cannot be cast to PageViewEvent
>           .sendTo(streamGraph.getOutputStream("OutputEvent", m -> String.valueOf(getMemberId(m)), m -> m));
> 
}


>     private String getMemberId(PageViewEvent pve) {

>       return pve.memberId.toString();

>     }
>     private PageViewEvent setTrackingCode(PageViewEvent pve) {

>       pve.trackingCode = "1";

>       return pve;

>     }

>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)