You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Ryan Brideau (JIRA)" <ji...@apache.org> on 2017/12/13 20:15:00 UTC

[jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Ryan Brideau created FLINK-8256:
-----------------------------------

             Summary: Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
                 Key: FLINK-8256
                 URL: https://issues.apache.org/jira/browse/FLINK-8256
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.4.0
         Environment: macOS, Local Flink v1.4.0, Scala 2.11
            Reporter: Ryan Brideau


I built the newest release locally today, but when I try to filter a stream using an anonymous or named function, I get an error. Here's a simple example:


{code:java}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._

object TestFunction {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val params = ParameterTool.fromArgs(args)
    env.getConfig.setGlobalJobParameters(params)

    val someArray = Array(1,2,3)
    val stream = env.fromCollection(someArray).filter(_ => true)
    stream.print().setParallelism(1)
    env.execute("Testing Function")
  }
}
{code}

This results in:


{code:java}
Job execution switched to status FAILING.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of org.peopleinmotion.TestFunction$$anonfun$1 to field org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$7
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)
        ... 6 more
12/13/2017 15:10:01     Job execution switched to status FAILED.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:492)
        at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:456)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
        at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
        at org.peopleinmotion.TestFunction.main(TestFunction.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:396)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:802)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1054)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1101)
        at org.apache.flink.client.CliFrontend$1.call(CliFrontend.java:1098)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
        at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: cannot assign instance of org.peopleinmotion.TestFunction$$anonfun$1 to field org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of type scala.Function1 in instance of org.apache.flink.streaming.api.scala.DataStream$$anon$7
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233)
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2288)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2282)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2206)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2064)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1568)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:428)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220)

{code}

However, replacing the function with this results in everything working as expected:

{code:java}
val stream = env.fromCollection(someArray).filter(new FilterFunction[Int] {
      override def filter(t: Int): Boolean = true
    })
{code}

Perhaps something changed in the new build compared to the previous, as this was working without issue before?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Stephan Ewen <se...@apache.org>.
@Shivam I think Flink 1.4 should not expose any ProtoBuf dependency any
more. We shade it in Mesos and use a ProtoBuf free akka version now.

On Thu, Dec 14, 2017 at 8:41 PM, Shivam Sharma <28...@gmail.com>
wrote:

> Hi Stephan,
>
> Thanks for your help. Basically reverted the classloading to parent
> first, *resolved
> this issue*. Thanks for this but I have one question:
>
> I am building a fat jar without any dependency as Provided. And in my case
> I am using proto-java version 3.4.0  but I think fink uses pretty old
> version(I think 2.5.0)
> and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case
> of parent-first classloading.
>
> Thanks
>
> On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <se...@apache.org> wrote:
>
> > @Shivam and @Ryan:
> >
> > My first feeling would be the following: You have the Scala library in
> your
> > user code, and thus through the reversed class loading, the scala
> function
> > types get duplicated.
> >
> > The right way to fix that is to make sure you build a proper jar file
> > without any provided dependencies. Make sure you set "-Pbuild-jar" when
> > packaging your program.
> >
> >   - You could also set "classloader.resolve-order: parent-first" in your
> > configuration to restore the old class loading style.
> >
> >   - We should add "scala" to the default value for
> > "classloader.parent-first-patterns". You can add it yourself in the
> > configuration (make sure you keep all existing parent-first-patterns as
> > well).
> >
> >
> >
> > On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <tr...@apache.org>
> > wrote:
> >
> > > Hi,
> > >
> > > I tried to reproduce the problem given your description Ryan. I
> submitted
> > > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> > > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> > > flink.apache.org and a cluster built from sources). However, I was not
> > > able
> > > to reproduce the problem. Therefore I suspect that it has something to
> do
> > > with your or my setup.
> > >
> > > In order to further diagnose the problem, it would be tremendously
> > helpful
> > > if you could share the logs contained in the logs directory with us.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org>
> wrote:
> > >
> > > > Hi!
> > > >
> > > > This may be due to the changed classloading semantics.
> > > >
> > > > Just to verify this, can you check if it gets solved by setting the
> > > > following in the Flink configuration: "classloader.resolve-order:
> > parent-
> > > > first"
> > > >
> > > > By default, Flink 1.4 uses now inverted classloading to allow users
> to
> > > use
> > > > their own copies of dependencies, irrespective of what the underlying
> > > > classpath is spoiled with. You can for example use a different Avro
> > > > versions than Hadoop pull in, even without shading, or even different
> > > Akka
> > > > / Jackson / etc versions.
> > > >
> > > > That is a nice improvement, but it may have some impacts on tools
> that
> > > have
> > > > been build before. When you see classcast exceptions (like X cannot
> be
> > > cast
> > > > to X), that is probably caused by the fact that the classloader
> > > duplicates
> > > > a dependency from the JVM classpath in user-space, but
> objects/classes
> > > move
> > > > between the domains.
> > > >
> > > > Stephan
> > > >
> > > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <
> > 28shivamsharma@gmail.com
> > > >
> > > > wrote:
> > > >
> > > > > Same Issue I am facing :-
> > > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > > > >
> > > > > Can anyone explain the exception
> > > > >
> > > > > Thanks
> > > > >
> > > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <
> > jira@apache.org>
> > > > > wrote:
> > > > >
> > > > > > Ryan Brideau created FLINK-8256:
> > > > > > -----------------------------------
> > > > > >
> > > > > >              Summary: Cannot use Scala functions to filter in
> 1.4 -
> > > > > > java.lang.ClassCastException
> > > > > >                  Key: FLINK-8256
> > > > > >                  URL: https://issues.apache.org/
> > > jira/browse/FLINK-8256
> > > > > >              Project: Flink
> > > > > >           Issue Type: Bug
> > > > > >           Components: DataStream API
> > > > > >     Affects Versions: 1.4.0
> > > > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > > > >             Reporter: Ryan Brideau
> > > > > >
> > > > > >
> > > > > > I built the newest release locally today, but when I try to
> filter
> > a
> > > > > > stream using an anonymous or named function, I get an error.
> > Here's a
> > > > > > simple example:
> > > > > >
> > > > > >
> > > > > > {code:java}
> > > > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > > > import org.apache.flink.streaming.api.scala._
> > > > > >
> > > > > > object TestFunction {
> > > > > >
> > > > > >   def main(args: Array[String]): Unit = {
> > > > > >
> > > > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > > > >     val params = ParameterTool.fromArgs(args)
> > > > > >     env.getConfig.setGlobalJobParameters(params)
> > > > > >
> > > > > >     val someArray = Array(1,2,3)
> > > > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > > > >     stream.print().setParallelism(1)
> > > > > >     env.execute("Testing Function")
> > > > > >   }
> > > > > > }
> > > > > > {code}
> > > > > >
> > > > > > This results in:
> > > > > >
> > > > > >
> > > > > > {code:java}
> > > > > > Job execution switched to status FAILING.
> > > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> > Cannot
> > > > > > instantiate user function.
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:235)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createChainedOperator(OperatorChain.java:355)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createOutputCollector(OperatorChain.java:282)
> > > > > >         at org.apache.flink.streaming.
> > runtime.tasks.OperatorChain.<
> > > > > > init>(OperatorChain.java:126)
> > > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > > invoke(StreamTask.java:231)
> > > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > > java:718)
> > > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by: java.lang.ClassCastException: cannot assign instance
> of
> > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.
> cleanFun$6
> > > of
> > > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > > api.scala.DataStream$$anon$7
> > > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > > setObjFieldValues(
> > > > > > ObjectStreamClass.java:2233)
> > > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > > ObjectStreamClass.java:1405)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2288)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2282)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.
> readObject(ObjectInputStream.
> > > > > > java:428)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > deserializeObject(
> > > > > > InstantiationUtil.java:290)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > > readObjectFromConfig(
> > > > > > InstantiationUtil.java:248)
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:220)
> > > > > >         ... 6 more
> > > > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > > > >
> > > > > > ------------------------------------------------------------
> > > > > >  The program finished with the following exception:
> > > > > >
> > > > > > org.apache.flink.client.program.ProgramInvocationException: The
> > > > program
> > > > > > execution failed: Job execution failed.
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:492)
> > > > > >         at org.apache.flink.client.program.
> > StandaloneClusterClient.
> > > > > > submitJob(StandaloneClusterClient.java:105)
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:456)
> > > > > >         at org.apache.flink.streaming.api.environment.
> > > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:
> 66)
> > > > > >         at org.apache.flink.streaming.api.scala.
> > > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > > > scala:638)
> > > > > >         at org.peopleinmotion.TestFunction$.main(
> > > > TestFunction.scala:20)
> > > > > >         at org.peopleinmotion.TestFunction.main(
> > TestFunction.scala)
> > > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> > > Method)
> > > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > > > NativeMethodAccessorImpl.java:62)
> > > > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > > > DelegatingMethodAccessorImpl.java:43)
> > > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > callMainMeth
> > > > > od(
> > > > > > PackagedProgram.java:525)
> > > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > > ClusterClient.java:396)
> > > > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > > > CliFrontend.java:802)
> > > > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > > > java:282)
> > > > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > > > CliFrontend.java:1054)
> > > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > > CliFrontend.java:1101)
> > > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > > CliFrontend.java:1098)
> > > > > >         at java.security.AccessController.doPrivileged(Native
> > > Method)
> > > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > > > UserGroupInformation.java:1556)
> > > > > >         at org.apache.flink.runtime.security.
> > HadoopSecurityContext.
> > > > > > runSecured(HadoopSecurityContext.java:41)
> > > > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> > > java:
> > > > > 1098)
> > > > > > Caused by: org.apache.flink.runtime.
> client.JobExecutionException:
> > > Job
> > > > > > execution failed.
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > > > mcV$sp(JobManager.scala:897)
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > > ger.scala:840)
> > > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > > ger.scala:840)
> > > > > >         at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.
> > > > > > liftedTree1$1(Future.scala:24)
> > > > > >         at scala.concurrent.impl.Future$
> > > PromiseCompletingRunnable.run(
> > > > > > Future.scala:24)
> > > > > >         at akka.dispatch.TaskInvocation.
> > > run(AbstractDispatcher.scala:
> > > > 39)
> > > > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > > > ForkJoinTask.java:260)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > > > runTask(ForkJoinPool.java:1339)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > > > ForkJoinPool.java:1979)
> > > > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > > > ForkJoinWorkerThread.java:107)
> > > > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > > > StreamTaskException:
> > > > > > Cannot instantiate user function.
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:235)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createChainedOperator(OperatorChain.java:355)
> > > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.
> > > > > > createOutputCollector(OperatorChain.java:282)
> > > > > >         at org.apache.flink.streaming.
> > runtime.tasks.OperatorChain.<
> > > > > > init>(OperatorChain.java:126)
> > > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > > invoke(StreamTask.java:231)
> > > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > > java:718)
> > > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > > Caused by: java.lang.ClassCastException: cannot assign instance
> of
> > > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.
> cleanFun$6
> > > of
> > > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > > api.scala.DataStream$$anon$7
> > > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > > setObjFieldValues(
> > > > > > ObjectStreamClass.java:2233)
> > > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > > ObjectStreamClass.java:1405)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2288)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > > ObjectInputStream.java:2282)
> > > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > > ObjectInputStream.java:2206)
> > > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > > ObjectInputStream.java:2064)
> > > > > >         at java.io.ObjectInputStream.
> > readObject0(ObjectInputStream.
> > > > > > java:1568)
> > > > > >         at java.io.ObjectInputStream.
> readObject(ObjectInputStream.
> > > > > > java:428)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > deserializeObject(
> > > > > > InstantiationUtil.java:290)
> > > > > >         at org.apache.flink.util.InstantiationUtil.
> > > > readObjectFromConfig(
> > > > > > InstantiationUtil.java:248)
> > > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > > getStreamOperator(StreamConfig.java:220)
> > > > > >
> > > > > > {code}
> > > > > >
> > > > > > However, replacing the function with this results in everything
> > > working
> > > > > as
> > > > > > expected:
> > > > > >
> > > > > > {code:java}
> > > > > > val stream = env.fromCollection(someArray).filter(new
> > > > > FilterFunction[Int]
> > > > > > {
> > > > > >       override def filter(t: Int): Boolean = true
> > > > > >     })
> > > > > > {code}
> > > > > >
> > > > > > Perhaps something changed in the new build compared to the
> > previous,
> > > as
> > > > > > this was working without issue before?
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > This message was sent by Atlassian JIRA
> > > > > > (v6.4.14#64029)
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > Shivam Sharma
> > > > > Data Engineer @ Goibibo
> > > > > Indian Institute Of Information Technology, Design and
> Manufacturing
> > > > > Jabalpur
> > > > > Mobile No- (+91) 8882114744
> > > > > Email:- 28shivamsharma@gmail.com
> > > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > > > <https://www.linkedin.com/in/28shivamsharma>*
> > > > >
> > > >
> > >
> >
>
>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsharma@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*
>

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Shivam Sharma <28...@gmail.com>.
Hi Stephan,

Thanks for your help. Basically reverted the classloading to parent
first, *resolved
this issue*. Thanks for this but I have one question:

I am building a fat jar without any dependency as Provided. And in my case
I am using proto-java version 3.4.0  but I think fink uses pretty old
version(I think 2.5.0)
and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case
of parent-first classloading.

Thanks

On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <se...@apache.org> wrote:

> @Shivam and @Ryan:
>
> My first feeling would be the following: You have the Scala library in your
> user code, and thus through the reversed class loading, the scala function
> types get duplicated.
>
> The right way to fix that is to make sure you build a proper jar file
> without any provided dependencies. Make sure you set "-Pbuild-jar" when
> packaging your program.
>
>   - You could also set "classloader.resolve-order: parent-first" in your
> configuration to restore the old class loading style.
>
>   - We should add "scala" to the default value for
> "classloader.parent-first-patterns". You can add it yourself in the
> configuration (make sure you keep all existing parent-first-patterns as
> well).
>
>
>
> On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <tr...@apache.org>
> wrote:
>
> > Hi,
> >
> > I tried to reproduce the problem given your description Ryan. I submitted
> > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> > flink.apache.org and a cluster built from sources). However, I was not
> > able
> > to reproduce the problem. Therefore I suspect that it has something to do
> > with your or my setup.
> >
> > In order to further diagnose the problem, it would be tremendously
> helpful
> > if you could share the logs contained in the logs directory with us.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org> wrote:
> >
> > > Hi!
> > >
> > > This may be due to the changed classloading semantics.
> > >
> > > Just to verify this, can you check if it gets solved by setting the
> > > following in the Flink configuration: "classloader.resolve-order:
> parent-
> > > first"
> > >
> > > By default, Flink 1.4 uses now inverted classloading to allow users to
> > use
> > > their own copies of dependencies, irrespective of what the underlying
> > > classpath is spoiled with. You can for example use a different Avro
> > > versions than Hadoop pull in, even without shading, or even different
> > Akka
> > > / Jackson / etc versions.
> > >
> > > That is a nice improvement, but it may have some impacts on tools that
> > have
> > > been build before. When you see classcast exceptions (like X cannot be
> > cast
> > > to X), that is probably caused by the fact that the classloader
> > duplicates
> > > a dependency from the JVM classpath in user-space, but objects/classes
> > move
> > > between the domains.
> > >
> > > Stephan
> > >
> > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <
> 28shivamsharma@gmail.com
> > >
> > > wrote:
> > >
> > > > Same Issue I am facing :-
> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > > >
> > > > Can anyone explain the exception
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <
> jira@apache.org>
> > > > wrote:
> > > >
> > > > > Ryan Brideau created FLINK-8256:
> > > > > -----------------------------------
> > > > >
> > > > >              Summary: Cannot use Scala functions to filter in 1.4 -
> > > > > java.lang.ClassCastException
> > > > >                  Key: FLINK-8256
> > > > >                  URL: https://issues.apache.org/
> > jira/browse/FLINK-8256
> > > > >              Project: Flink
> > > > >           Issue Type: Bug
> > > > >           Components: DataStream API
> > > > >     Affects Versions: 1.4.0
> > > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > > >             Reporter: Ryan Brideau
> > > > >
> > > > >
> > > > > I built the newest release locally today, but when I try to filter
> a
> > > > > stream using an anonymous or named function, I get an error.
> Here's a
> > > > > simple example:
> > > > >
> > > > >
> > > > > {code:java}
> > > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > > import org.apache.flink.streaming.api.scala._
> > > > >
> > > > > object TestFunction {
> > > > >
> > > > >   def main(args: Array[String]): Unit = {
> > > > >
> > > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > > >     val params = ParameterTool.fromArgs(args)
> > > > >     env.getConfig.setGlobalJobParameters(params)
> > > > >
> > > > >     val someArray = Array(1,2,3)
> > > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > > >     stream.print().setParallelism(1)
> > > > >     env.execute("Testing Function")
> > > > >   }
> > > > > }
> > > > > {code}
> > > > >
> > > > > This results in:
> > > > >
> > > > >
> > > > > {code:java}
> > > > > Job execution switched to status FAILING.
> > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot
> > > > > instantiate user function.
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:235)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createChainedOperator(OperatorChain.java:355)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createOutputCollector(OperatorChain.java:282)
> > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.<
> > > > > init>(OperatorChain.java:126)
> > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > invoke(StreamTask.java:231)
> > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > java:718)
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> > of
> > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > api.scala.DataStream$$anon$7
> > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > setObjFieldValues(
> > > > > ObjectStreamClass.java:2233)
> > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > ObjectStreamClass.java:1405)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2288)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2282)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > > java:428)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > deserializeObject(
> > > > > InstantiationUtil.java:290)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > > readObjectFromConfig(
> > > > > InstantiationUtil.java:248)
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:220)
> > > > >         ... 6 more
> > > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > > >
> > > > > ------------------------------------------------------------
> > > > >  The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> > > program
> > > > > execution failed: Job execution failed.
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:492)
> > > > >         at org.apache.flink.client.program.
> StandaloneClusterClient.
> > > > > submitJob(StandaloneClusterClient.java:105)
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:456)
> > > > >         at org.apache.flink.streaming.api.environment.
> > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > > > >         at org.apache.flink.streaming.api.scala.
> > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > > scala:638)
> > > > >         at org.peopleinmotion.TestFunction$.main(
> > > TestFunction.scala:20)
> > > > >         at org.peopleinmotion.TestFunction.main(
> TestFunction.scala)
> > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > > NativeMethodAccessorImpl.java:62)
> > > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > > DelegatingMethodAccessorImpl.java:43)
> > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > callMainMeth
> > > > od(
> > > > > PackagedProgram.java:525)
> > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:396)
> > > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > > CliFrontend.java:802)
> > > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > > java:282)
> > > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > > CliFrontend.java:1054)
> > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > CliFrontend.java:1101)
> > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > CliFrontend.java:1098)
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > > UserGroupInformation.java:1556)
> > > > >         at org.apache.flink.runtime.security.
> HadoopSecurityContext.
> > > > > runSecured(HadoopSecurityContext.java:41)
> > > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> > java:
> > > > 1098)
> > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Job
> > > > > execution failed.
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > > mcV$sp(JobManager.scala:897)
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > ger.scala:840)
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > ger.scala:840)
> > > > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > > > liftedTree1$1(Future.scala:24)
> > > > >         at scala.concurrent.impl.Future$
> > PromiseCompletingRunnable.run(
> > > > > Future.scala:24)
> > > > >         at akka.dispatch.TaskInvocation.
> > run(AbstractDispatcher.scala:
> > > 39)
> > > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > > ForkJoinTask.java:260)
> > > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > > runTask(ForkJoinPool.java:1339)
> > > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > > ForkJoinPool.java:1979)
> > > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > > ForkJoinWorkerThread.java:107)
> > > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > > StreamTaskException:
> > > > > Cannot instantiate user function.
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:235)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createChainedOperator(OperatorChain.java:355)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createOutputCollector(OperatorChain.java:282)
> > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.<
> > > > > init>(OperatorChain.java:126)
> > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > invoke(StreamTask.java:231)
> > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > java:718)
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> > of
> > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > api.scala.DataStream$$anon$7
> > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > setObjFieldValues(
> > > > > ObjectStreamClass.java:2233)
> > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > ObjectStreamClass.java:1405)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2288)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2282)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > > java:428)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > deserializeObject(
> > > > > InstantiationUtil.java:290)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > > readObjectFromConfig(
> > > > > InstantiationUtil.java:248)
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:220)
> > > > >
> > > > > {code}
> > > > >
> > > > > However, replacing the function with this results in everything
> > working
> > > > as
> > > > > expected:
> > > > >
> > > > > {code:java}
> > > > > val stream = env.fromCollection(someArray).filter(new
> > > > FilterFunction[Int]
> > > > > {
> > > > >       override def filter(t: Int): Boolean = true
> > > > >     })
> > > > > {code}
> > > > >
> > > > > Perhaps something changed in the new build compared to the
> previous,
> > as
> > > > > this was working without issue before?
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > This message was sent by Atlassian JIRA
> > > > > (v6.4.14#64029)
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Shivam Sharma
> > > > Data Engineer @ Goibibo
> > > > Indian Institute Of Information Technology, Design and Manufacturing
> > > > Jabalpur
> > > > Mobile No- (+91) 8882114744
> > > > Email:- 28shivamsharma@gmail.com
> > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > > <https://www.linkedin.com/in/28shivamsharma>*
> > > >
> > >
> >
>



-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Stephan Ewen <se...@apache.org>.
@Shivam and @Ryan:

My first feeling would be the following: You have the Scala library in your
user code, and thus through the reversed class loading, the scala function
types get duplicated.

The right way to fix that is to make sure you build a proper jar file
without any provided dependencies. Make sure you set "-Pbuild-jar" when
packaging your program.

  - You could also set "classloader.resolve-order: parent-first" in your
configuration to restore the old class loading style.

  - We should add "scala" to the default value for
"classloader.parent-first-patterns". You can add it yourself in the
configuration (make sure you keep all existing parent-first-patterns as
well).



On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <tr...@apache.org>
wrote:

> Hi,
>
> I tried to reproduce the problem given your description Ryan. I submitted
> the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> flink.apache.org and a cluster built from sources). However, I was not
> able
> to reproduce the problem. Therefore I suspect that it has something to do
> with your or my setup.
>
> In order to further diagnose the problem, it would be tremendously helpful
> if you could share the logs contained in the logs directory with us.
>
> Cheers,
> Till
>
> On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org> wrote:
>
> > Hi!
> >
> > This may be due to the changed classloading semantics.
> >
> > Just to verify this, can you check if it gets solved by setting the
> > following in the Flink configuration: "classloader.resolve-order: parent-
> > first"
> >
> > By default, Flink 1.4 uses now inverted classloading to allow users to
> use
> > their own copies of dependencies, irrespective of what the underlying
> > classpath is spoiled with. You can for example use a different Avro
> > versions than Hadoop pull in, even without shading, or even different
> Akka
> > / Jackson / etc versions.
> >
> > That is a nice improvement, but it may have some impacts on tools that
> have
> > been build before. When you see classcast exceptions (like X cannot be
> cast
> > to X), that is probably caused by the fact that the classloader
> duplicates
> > a dependency from the JVM classpath in user-space, but objects/classes
> move
> > between the domains.
> >
> > Stephan
> >
> > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28shivamsharma@gmail.com
> >
> > wrote:
> >
> > > Same Issue I am facing :-
> > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > >
> > > Can anyone explain the exception
> > >
> > > Thanks
> > >
> > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <ji...@apache.org>
> > > wrote:
> > >
> > > > Ryan Brideau created FLINK-8256:
> > > > -----------------------------------
> > > >
> > > >              Summary: Cannot use Scala functions to filter in 1.4 -
> > > > java.lang.ClassCastException
> > > >                  Key: FLINK-8256
> > > >                  URL: https://issues.apache.org/
> jira/browse/FLINK-8256
> > > >              Project: Flink
> > > >           Issue Type: Bug
> > > >           Components: DataStream API
> > > >     Affects Versions: 1.4.0
> > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > >             Reporter: Ryan Brideau
> > > >
> > > >
> > > > I built the newest release locally today, but when I try to filter a
> > > > stream using an anonymous or named function, I get an error. Here's a
> > > > simple example:
> > > >
> > > >
> > > > {code:java}
> > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > import org.apache.flink.streaming.api.scala._
> > > >
> > > > object TestFunction {
> > > >
> > > >   def main(args: Array[String]): Unit = {
> > > >
> > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >     val params = ParameterTool.fromArgs(args)
> > > >     env.getConfig.setGlobalJobParameters(params)
> > > >
> > > >     val someArray = Array(1,2,3)
> > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > >     stream.print().setParallelism(1)
> > > >     env.execute("Testing Function")
> > > >   }
> > > > }
> > > > {code}
> > > >
> > > > This results in:
> > > >
> > > >
> > > > {code:java}
> > > > Job execution switched to status FAILING.
> > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > > > instantiate user function.
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:235)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createChainedOperator(OperatorChain.java:355)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createOutputCollector(OperatorChain.java:282)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > > init>(OperatorChain.java:126)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > invoke(StreamTask.java:231)
> > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:718)
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> of
> > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > api.scala.DataStream$$anon$7
> > > >         at java.io.ObjectStreamClass$FieldReflector.
> setObjFieldValues(
> > > > ObjectStreamClass.java:2233)
> > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > ObjectStreamClass.java:1405)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2288)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2282)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > java:428)
> > > >         at org.apache.flink.util.InstantiationUtil.
> deserializeObject(
> > > > InstantiationUtil.java:290)
> > > >         at org.apache.flink.util.InstantiationUtil.
> > readObjectFromConfig(
> > > > InstantiationUtil.java:248)
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:220)
> > > >         ... 6 more
> > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > >
> > > > ------------------------------------------------------------
> > > >  The program finished with the following exception:
> > > >
> > > > org.apache.flink.client.program.ProgramInvocationException: The
> > program
> > > > execution failed: Job execution failed.
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:492)
> > > >         at org.apache.flink.client.program.StandaloneClusterClient.
> > > > submitJob(StandaloneClusterClient.java:105)
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:456)
> > > >         at org.apache.flink.streaming.api.environment.
> > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > > >         at org.apache.flink.streaming.api.scala.
> > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > scala:638)
> > > >         at org.peopleinmotion.TestFunction$.main(
> > TestFunction.scala:20)
> > > >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > NativeMethodAccessorImpl.java:62)
> > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > DelegatingMethodAccessorImpl.java:43)
> > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > >         at org.apache.flink.client.program.PackagedProgram.
> > callMainMeth
> > > od(
> > > > PackagedProgram.java:525)
> > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > ClusterClient.java:396)
> > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > CliFrontend.java:802)
> > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > java:282)
> > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > CliFrontend.java:1054)
> > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > CliFrontend.java:1101)
> > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > CliFrontend.java:1098)
> > > >         at java.security.AccessController.doPrivileged(Native
> Method)
> > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > UserGroupInformation.java:1556)
> > > >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > > > runSecured(HadoopSecurityContext.java:41)
> > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> java:
> > > 1098)
> > > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> Job
> > > > execution failed.
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > mcV$sp(JobManager.scala:897)
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > ger.scala:840)
> > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > ger.scala:840)
> > > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > > liftedTree1$1(Future.scala:24)
> > > >         at scala.concurrent.impl.Future$
> PromiseCompletingRunnable.run(
> > > > Future.scala:24)
> > > >         at akka.dispatch.TaskInvocation.
> run(AbstractDispatcher.scala:
> > 39)
> > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > ForkJoinTask.java:260)
> > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > runTask(ForkJoinPool.java:1339)
> > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > ForkJoinPool.java:1979)
> > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > ForkJoinWorkerThread.java:107)
> > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > StreamTaskException:
> > > > Cannot instantiate user function.
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:235)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createChainedOperator(OperatorChain.java:355)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > createOutputCollector(OperatorChain.java:282)
> > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > > init>(OperatorChain.java:126)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > invoke(StreamTask.java:231)
> > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:718)
> > > >         at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> of
> > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > api.scala.DataStream$$anon$7
> > > >         at java.io.ObjectStreamClass$FieldReflector.
> setObjFieldValues(
> > > > ObjectStreamClass.java:2233)
> > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > ObjectStreamClass.java:1405)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2288)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > ObjectInputStream.java:2282)
> > > >         at java.io.ObjectInputStream.readSerialData(
> > > > ObjectInputStream.java:2206)
> > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > ObjectInputStream.java:2064)
> > > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > > java:1568)
> > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > java:428)
> > > >         at org.apache.flink.util.InstantiationUtil.
> deserializeObject(
> > > > InstantiationUtil.java:290)
> > > >         at org.apache.flink.util.InstantiationUtil.
> > readObjectFromConfig(
> > > > InstantiationUtil.java:248)
> > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > getStreamOperator(StreamConfig.java:220)
> > > >
> > > > {code}
> > > >
> > > > However, replacing the function with this results in everything
> working
> > > as
> > > > expected:
> > > >
> > > > {code:java}
> > > > val stream = env.fromCollection(someArray).filter(new
> > > FilterFunction[Int]
> > > > {
> > > >       override def filter(t: Int): Boolean = true
> > > >     })
> > > > {code}
> > > >
> > > > Perhaps something changed in the new build compared to the previous,
> as
> > > > this was working without issue before?
> > > >
> > > >
> > > >
> > > > --
> > > > This message was sent by Atlassian JIRA
> > > > (v6.4.14#64029)
> > > >
> > >
> > >
> > >
> > > --
> > > Shivam Sharma
> > > Data Engineer @ Goibibo
> > > Indian Institute Of Information Technology, Design and Manufacturing
> > > Jabalpur
> > > Mobile No- (+91) 8882114744
> > > Email:- 28shivamsharma@gmail.com
> > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > <https://www.linkedin.com/in/28shivamsharma>*
> > >
> >
>

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Till Rohrmann <tr...@apache.org>.
Hi,

I tried to reproduce the problem given your description Ryan. I submitted
the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
flink.apache.org and a cluster built from sources). However, I was not able
to reproduce the problem. Therefore I suspect that it has something to do
with your or my setup.

In order to further diagnose the problem, it would be tremendously helpful
if you could share the logs contained in the logs directory with us.

Cheers,
Till

On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> This may be due to the changed classloading semantics.
>
> Just to verify this, can you check if it gets solved by setting the
> following in the Flink configuration: "classloader.resolve-order: parent-
> first"
>
> By default, Flink 1.4 uses now inverted classloading to allow users to use
> their own copies of dependencies, irrespective of what the underlying
> classpath is spoiled with. You can for example use a different Avro
> versions than Hadoop pull in, even without shading, or even different Akka
> / Jackson / etc versions.
>
> That is a nice improvement, but it may have some impacts on tools that have
> been build before. When you see classcast exceptions (like X cannot be cast
> to X), that is probably caused by the fact that the classloader duplicates
> a dependency from the JVM classpath in user-space, but objects/classes move
> between the domains.
>
> Stephan
>
> On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28...@gmail.com>
> wrote:
>
> > Same Issue I am facing :-
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> >
> > Can anyone explain the exception
> >
> > Thanks
> >
> > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <ji...@apache.org>
> > wrote:
> >
> > > Ryan Brideau created FLINK-8256:
> > > -----------------------------------
> > >
> > >              Summary: Cannot use Scala functions to filter in 1.4 -
> > > java.lang.ClassCastException
> > >                  Key: FLINK-8256
> > >                  URL: https://issues.apache.org/jira/browse/FLINK-8256
> > >              Project: Flink
> > >           Issue Type: Bug
> > >           Components: DataStream API
> > >     Affects Versions: 1.4.0
> > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > >             Reporter: Ryan Brideau
> > >
> > >
> > > I built the newest release locally today, but when I try to filter a
> > > stream using an anonymous or named function, I get an error. Here's a
> > > simple example:
> > >
> > >
> > > {code:java}
> > > import org.apache.flink.api.java.utils.ParameterTool
> > > import org.apache.flink.streaming.api.scala._
> > >
> > > object TestFunction {
> > >
> > >   def main(args: Array[String]): Unit = {
> > >
> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >     val params = ParameterTool.fromArgs(args)
> > >     env.getConfig.setGlobalJobParameters(params)
> > >
> > >     val someArray = Array(1,2,3)
> > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > >     stream.print().setParallelism(1)
> > >     env.execute("Testing Function")
> > >   }
> > > }
> > > {code}
> > >
> > > This results in:
> > >
> > >
> > > {code:java}
> > > Job execution switched to status FAILING.
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > > instantiate user function.
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:235)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createChainedOperator(OperatorChain.java:355)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createOutputCollector(OperatorChain.java:282)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > init>(OperatorChain.java:126)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > invoke(StreamTask.java:231)
> > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:718)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > api.scala.DataStream$$anon$7
> > >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > > ObjectStreamClass.java:2233)
> > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > ObjectStreamClass.java:1405)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2288)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2282)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > java:428)
> > >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > > InstantiationUtil.java:290)
> > >         at org.apache.flink.util.InstantiationUtil.
> readObjectFromConfig(
> > > InstantiationUtil.java:248)
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:220)
> > >         ... 6 more
> > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > >
> > > ------------------------------------------------------------
> > >  The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The
> program
> > > execution failed: Job execution failed.
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:492)
> > >         at org.apache.flink.client.program.StandaloneClusterClient.
> > > submitJob(StandaloneClusterClient.java:105)
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:456)
> > >         at org.apache.flink.streaming.api.environment.
> > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > >         at org.apache.flink.streaming.api.scala.
> > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> scala:638)
> > >         at org.peopleinmotion.TestFunction$.main(
> TestFunction.scala:20)
> > >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > NativeMethodAccessorImpl.java:62)
> > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > >         at org.apache.flink.client.program.PackagedProgram.
> callMainMeth
> > od(
> > > PackagedProgram.java:525)
> > >         at org.apache.flink.client.program.PackagedProgram.
> > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:396)
> > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > CliFrontend.java:802)
> > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> java:282)
> > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > CliFrontend.java:1054)
> > >         at org.apache.flink.client.CliFrontend$1.call(
> > > CliFrontend.java:1101)
> > >         at org.apache.flink.client.CliFrontend$1.call(
> > > CliFrontend.java:1098)
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1556)
> > >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > > runSecured(HadoopSecurityContext.java:41)
> > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
> > 1098)
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > > execution failed.
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > mcV$sp(JobManager.scala:897)
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > ger.scala:840)
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > ger.scala:840)
> > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > liftedTree1$1(Future.scala:24)
> > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > > Future.scala:24)
> > >         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:
> 39)
> > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > ForkJoinTask.java:260)
> > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > runTask(ForkJoinPool.java:1339)
> > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > ForkJoinPool.java:1979)
> > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > ForkJoinWorkerThread.java:107)
> > > Caused by: org.apache.flink.streaming.runtime.tasks.
> StreamTaskException:
> > > Cannot instantiate user function.
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:235)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createChainedOperator(OperatorChain.java:355)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createOutputCollector(OperatorChain.java:282)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > init>(OperatorChain.java:126)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > invoke(StreamTask.java:231)
> > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:718)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > api.scala.DataStream$$anon$7
> > >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > > ObjectStreamClass.java:2233)
> > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > ObjectStreamClass.java:1405)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2288)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2282)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > java:428)
> > >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > > InstantiationUtil.java:290)
> > >         at org.apache.flink.util.InstantiationUtil.
> readObjectFromConfig(
> > > InstantiationUtil.java:248)
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:220)
> > >
> > > {code}
> > >
> > > However, replacing the function with this results in everything working
> > as
> > > expected:
> > >
> > > {code:java}
> > > val stream = env.fromCollection(someArray).filter(new
> > FilterFunction[Int]
> > > {
> > >       override def filter(t: Int): Boolean = true
> > >     })
> > > {code}
> > >
> > > Perhaps something changed in the new build compared to the previous, as
> > > this was working without issue before?
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian JIRA
> > > (v6.4.14#64029)
> > >
> >
> >
> >
> > --
> > Shivam Sharma
> > Data Engineer @ Goibibo
> > Indian Institute Of Information Technology, Design and Manufacturing
> > Jabalpur
> > Mobile No- (+91) 8882114744
> > Email:- 28shivamsharma@gmail.com
> > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > <https://www.linkedin.com/in/28shivamsharma>*
> >
>

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Stephan Ewen <se...@apache.org>.
Hi!

This may be due to the changed classloading semantics.

Just to verify this, can you check if it gets solved by setting the
following in the Flink configuration: "classloader.resolve-order: parent-
first"

By default, Flink 1.4 uses now inverted classloading to allow users to use
their own copies of dependencies, irrespective of what the underlying
classpath is spoiled with. You can for example use a different Avro
versions than Hadoop pull in, even without shading, or even different Akka
/ Jackson / etc versions.

That is a nice improvement, but it may have some impacts on tools that have
been build before. When you see classcast exceptions (like X cannot be cast
to X), that is probably caused by the fact that the classloader duplicates
a dependency from the JVM classpath in user-space, but objects/classes move
between the domains.

Stephan

On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28...@gmail.com>
wrote:

> Same Issue I am facing :-
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
>
> Can anyone explain the exception
>
> Thanks
>
> On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <ji...@apache.org>
> wrote:
>
> > Ryan Brideau created FLINK-8256:
> > -----------------------------------
> >
> >              Summary: Cannot use Scala functions to filter in 1.4 -
> > java.lang.ClassCastException
> >                  Key: FLINK-8256
> >                  URL: https://issues.apache.org/jira/browse/FLINK-8256
> >              Project: Flink
> >           Issue Type: Bug
> >           Components: DataStream API
> >     Affects Versions: 1.4.0
> >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> >             Reporter: Ryan Brideau
> >
> >
> > I built the newest release locally today, but when I try to filter a
> > stream using an anonymous or named function, I get an error. Here's a
> > simple example:
> >
> >
> > {code:java}
> > import org.apache.flink.api.java.utils.ParameterTool
> > import org.apache.flink.streaming.api.scala._
> >
> > object TestFunction {
> >
> >   def main(args: Array[String]): Unit = {
> >
> >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> >     val params = ParameterTool.fromArgs(args)
> >     env.getConfig.setGlobalJobParameters(params)
> >
> >     val someArray = Array(1,2,3)
> >     val stream = env.fromCollection(someArray).filter(_ => true)
> >     stream.print().setParallelism(1)
> >     env.execute("Testing Function")
> >   }
> > }
> > {code}
> >
> > This results in:
> >
> >
> > {code:java}
> > Job execution switched to status FAILING.
> > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > instantiate user function.
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:235)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createChainedOperator(OperatorChain.java:355)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createOutputCollector(OperatorChain.java:282)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > init>(OperatorChain.java:126)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:231)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.ClassCastException: cannot assign instance of
> > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > type scala.Function1 in instance of org.apache.flink.streaming.
> > api.scala.DataStream$$anon$7
> >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > ObjectStreamClass.java:2233)
> >         at java.io.ObjectStreamClass.setObjFieldValues(
> > ObjectStreamClass.java:1405)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2288)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2282)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > java:428)
> >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > InstantiationUtil.java:290)
> >         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> > InstantiationUtil.java:248)
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:220)
> >         ... 6 more
> > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> >
> > ------------------------------------------------------------
> >  The program finished with the following exception:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The program
> > execution failed: Job execution failed.
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:492)
> >         at org.apache.flink.client.program.StandaloneClusterClient.
> > submitJob(StandaloneClusterClient.java:105)
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:456)
> >         at org.apache.flink.streaming.api.environment.
> > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> >         at org.apache.flink.streaming.api.scala.
> > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> >         at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
> >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > NativeMethodAccessorImpl.java:62)
> >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > DelegatingMethodAccessorImpl.java:43)
> >         at java.lang.reflect.Method.invoke(Method.java:498)
> >         at org.apache.flink.client.program.PackagedProgram.callMainMeth
> od(
> > PackagedProgram.java:525)
> >         at org.apache.flink.client.program.PackagedProgram.
> > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> >         at org.apache.flink.client.program.ClusterClient.run(
> > ClusterClient.java:396)
> >         at org.apache.flink.client.CliFrontend.executeProgram(
> > CliFrontend.java:802)
> >         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
> >         at org.apache.flink.client.CliFrontend.parseParameters(
> > CliFrontend.java:1054)
> >         at org.apache.flink.client.CliFrontend$1.call(
> > CliFrontend.java:1101)
> >         at org.apache.flink.client.CliFrontend$1.call(
> > CliFrontend.java:1098)
> >         at java.security.AccessController.doPrivileged(Native Method)
> >         at javax.security.auth.Subject.doAs(Subject.java:422)
> >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > UserGroupInformation.java:1556)
> >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > runSecured(HadoopSecurityContext.java:41)
> >         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
> 1098)
> > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > execution failed.
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > mcV$sp(JobManager.scala:897)
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> ger.scala:840)
> >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> ger.scala:840)
> >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > liftedTree1$1(Future.scala:24)
> >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > Future.scala:24)
> >         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > ForkJoinTask.java:260)
> >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > runTask(ForkJoinPool.java:1339)
> >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > ForkJoinPool.java:1979)
> >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > ForkJoinWorkerThread.java:107)
> > Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> > Cannot instantiate user function.
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:235)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createChainedOperator(OperatorChain.java:355)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > createOutputCollector(OperatorChain.java:282)
> >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > init>(OperatorChain.java:126)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:231)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.ClassCastException: cannot assign instance of
> > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > type scala.Function1 in instance of org.apache.flink.streaming.
> > api.scala.DataStream$$anon$7
> >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > ObjectStreamClass.java:2233)
> >         at java.io.ObjectStreamClass.setObjFieldValues(
> > ObjectStreamClass.java:1405)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2288)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.defaultReadFields(
> > ObjectInputStream.java:2282)
> >         at java.io.ObjectInputStream.readSerialData(
> > ObjectInputStream.java:2206)
> >         at java.io.ObjectInputStream.readOrdinaryObject(
> > ObjectInputStream.java:2064)
> >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > java:1568)
> >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > java:428)
> >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > InstantiationUtil.java:290)
> >         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> > InstantiationUtil.java:248)
> >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > getStreamOperator(StreamConfig.java:220)
> >
> > {code}
> >
> > However, replacing the function with this results in everything working
> as
> > expected:
> >
> > {code:java}
> > val stream = env.fromCollection(someArray).filter(new
> FilterFunction[Int]
> > {
> >       override def filter(t: Int): Boolean = true
> >     })
> > {code}
> >
> > Perhaps something changed in the new build compared to the previous, as
> > this was working without issue before?
> >
> >
> >
> > --
> > This message was sent by Atlassian JIRA
> > (v6.4.14#64029)
> >
>
>
>
> --
> Shivam Sharma
> Data Engineer @ Goibibo
> Indian Institute Of Information Technology, Design and Manufacturing
> Jabalpur
> Mobile No- (+91) 8882114744
> Email:- 28shivamsharma@gmail.com
> LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> <https://www.linkedin.com/in/28shivamsharma>*
>

Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException

Posted by Shivam Sharma <28...@gmail.com>.
Same Issue I am facing :-
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html

Can anyone explain the exception

Thanks

On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <ji...@apache.org>
wrote:

> Ryan Brideau created FLINK-8256:
> -----------------------------------
>
>              Summary: Cannot use Scala functions to filter in 1.4 -
> java.lang.ClassCastException
>                  Key: FLINK-8256
>                  URL: https://issues.apache.org/jira/browse/FLINK-8256
>              Project: Flink
>           Issue Type: Bug
>           Components: DataStream API
>     Affects Versions: 1.4.0
>          Environment: macOS, Local Flink v1.4.0, Scala 2.11
>             Reporter: Ryan Brideau
>
>
> I built the newest release locally today, but when I try to filter a
> stream using an anonymous or named function, I get an error. Here's a
> simple example:
>
>
> {code:java}
> import org.apache.flink.api.java.utils.ParameterTool
> import org.apache.flink.streaming.api.scala._
>
> object TestFunction {
>
>   def main(args: Array[String]): Unit = {
>
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val params = ParameterTool.fromArgs(args)
>     env.getConfig.setGlobalJobParameters(params)
>
>     val someArray = Array(1,2,3)
>     val stream = env.fromCollection(someArray).filter(_ => true)
>     stream.print().setParallelism(1)
>     env.execute("Testing Function")
>   }
> }
> {code}
>
> This results in:
>
>
> {code:java}
> Job execution switched to status FAILING.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:235)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:355)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:282)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:126)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:231)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> type scala.Function1 in instance of org.apache.flink.streaming.
> api.scala.DataStream$$anon$7
>         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2233)
>         at java.io.ObjectStreamClass.setObjFieldValues(
> ObjectStreamClass.java:1405)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2288)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:248)
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:220)
>         ... 6 more
> 12/13/2017 15:10:01     Job execution switched to status FAILED.
>
> ------------------------------------------------------------
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Job execution failed.
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:492)
>         at org.apache.flink.client.program.StandaloneClusterClient.
> submitJob(StandaloneClusterClient.java:105)
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:456)
>         at org.apache.flink.streaming.api.environment.
> StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>         at org.apache.flink.streaming.api.scala.
> StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>         at org.peopleinmotion.TestFunction$.main(TestFunction.scala:20)
>         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:525)
>         at org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:417)
>         at org.apache.flink.client.program.ClusterClient.run(
> ClusterClient.java:396)
>         at org.apache.flink.client.CliFrontend.executeProgram(
> CliFrontend.java:802)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:282)
>         at org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1054)
>         at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1101)
>         at org.apache.flink.client.CliFrontend$1.call(
> CliFrontend.java:1098)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at org.apache.hadoop.security.UserGroupInformation.doAs(
> UserGroupInformation.java:1556)
>         at org.apache.flink.runtime.security.HadoopSecurityContext.
> runSecured(HadoopSecurityContext.java:41)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1098)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> mcV$sp(JobManager.scala:897)
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>         at akka.dispatch.ForkJoinExecutorConfigurator$
> AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot instantiate user function.
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:235)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createChainedOperator(OperatorChain.java:355)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> createOutputCollector(OperatorChain.java:282)
>         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> init>(OperatorChain.java:126)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:231)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.ClassCastException: cannot assign instance of
> org.peopleinmotion.TestFunction$$anonfun$1 to field
> org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> type scala.Function1 in instance of org.apache.flink.streaming.
> api.scala.DataStream$$anon$7
>         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2233)
>         at java.io.ObjectStreamClass.setObjFieldValues(
> ObjectStreamClass.java:1405)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2288)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.defaultReadFields(
> ObjectInputStream.java:2282)
>         at java.io.ObjectInputStream.readSerialData(
> ObjectInputStream.java:2206)
>         at java.io.ObjectInputStream.readOrdinaryObject(
> ObjectInputStream.java:2064)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> java:1568)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> java:428)
>         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> InstantiationUtil.java:290)
>         at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(
> InstantiationUtil.java:248)
>         at org.apache.flink.streaming.api.graph.StreamConfig.
> getStreamOperator(StreamConfig.java:220)
>
> {code}
>
> However, replacing the function with this results in everything working as
> expected:
>
> {code:java}
> val stream = env.fromCollection(someArray).filter(new FilterFunction[Int]
> {
>       override def filter(t: Int): Boolean = true
>     })
> {code}
>
> Perhaps something changed in the new build compared to the previous, as
> this was working without issue before?
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.4.14#64029)
>



-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*