You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Erik van Oosten (JIRA)" <ji...@apache.org> on 2017/06/22 04:24:00 UTC

[jira] [Commented] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job.

    [ https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16058706#comment-16058706 ] 

Erik van Oosten commented on FLINK-5633:
----------------------------------------

In case you need throughput (like we do), the caching is indispensable. In those cases you can use the following {{SpecificData}} implementation. Simply instantiate it once and then pass that singleton instance to every {{SpecificDatumReader}}.

{code:scala|title=LocalCachingSpecificData.scala}
import java.lang.reflect.Constructor
import java.util.concurrent.ConcurrentHashMap

import org.apache.avro.Schema
import org.apache.avro.specific.SpecificData
import scala.collection.JavaConverters._

/**
  * This can be used instead of [[SpecificData]] in multi-classloader environments like Flink.
  * This variation removes the JVM singleton constructor cache and replaces it with a
  * cache that is local to the current class loader.
  *
  * If two Flink jobs use the same generated Avro code, they will still have separate instances of the classes because
  * they live in separate class loaders.
  * However, a JVM-wide singleton cache keeps reference to the class in the first class loader that was loaded. Any
  * subsequent jobs will fail with a [[ClassCastException]] because they will get incompatible classes.
  */
class LocalCachingSpecificData extends SpecificData {
  private val NO_ARG: Array[Class[_]] = Array.empty
  private val SCHEMA_ARG: Array[Class[_]] = Array(classOf[Schema])
  private val CTOR_CACHE: scala.collection.concurrent.Map[Class[_], Constructor[_]] =
    new ConcurrentHashMap[Class[_], Constructor[_]]().asScala

  /** Create an instance of a class.
    * If the class implements [[org.apache.avro.specific.SpecificData.SchemaConstructable]], call a constructor with a
    * [[org.apache.avro.Schema]] parameter, otherwise use a no-arg constructor.
    */
  private def newInstance(c: Class[_], s: Schema): AnyRef = {
    val useSchema = classOf[SpecificData.SchemaConstructable].isAssignableFrom(c)
    val constructor = CTOR_CACHE.getOrElseUpdate(c, {
      val ctor = c.getDeclaredConstructor((if (useSchema) SCHEMA_ARG else NO_ARG): _*)
      ctor.setAccessible(true)
      ctor
    })
    if (useSchema) constructor.newInstance(s).asInstanceOf[AnyRef]
    else constructor.newInstance().asInstanceOf[AnyRef]
  }

  override def createFixed(old: AnyRef, schema: Schema): AnyRef = {
    val c = getClass(schema)
    if (c == null) super.createFixed(old, schema) // delegate to generic
    else if (c.isInstance(old)) old
    else newInstance(c, schema)
  }

  override def newRecord(old: AnyRef, schema: Schema): AnyRef = {
    val c = getClass(schema)
    if (c == null) super.newRecord(old, schema) // delegate to generic
    else if (c.isInstance(old)) {old }
    else {newInstance(c, schema) }
  }
}
{code}

> ClassCastException: X cannot be cast to X when re-submitting a job.
> -------------------------------------------------------------------
>
>                 Key: FLINK-5633
>                 URL: https://issues.apache.org/jira/browse/FLINK-5633
>             Project: Flink
>          Issue Type: Bug
>          Components: Job-Submission, YARN
>    Affects Versions: 1.1.4
>            Reporter: Giuliano Caliari
>            Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job everything works but whenever I cancel and re-submit the same job it fails with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
> 	at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
> 	at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> 	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
> 	at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
> 	at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
> 	at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
> 	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> 	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.App$$anonfun$main$1.apply(App.scala:76)
> 	at scala.collection.immutable.List.foreach(List.scala:381)
> 	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> 	at scala.App$class.main(App.scala:76)
> 	at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
> 	at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> 	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> 	at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
> 	at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
> 	at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
> 	at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
> 	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
> 	at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
> 	at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
> 	at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not forward element to next operator
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
> 	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)
> 	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)
> 	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
> 	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> 	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: au.com.my.package.schema.p.WowTransaction cannot be cast to au.com.my.package.schema.p.WowTransaction
> 	at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132)
> 	at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763)
> 	at org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412)
> 	... 14 more
> {quote}
> This happens on versions 1.1.4 and 1.2
> Here's a great description of the problem, provided by Yury Ruchin:
> {quote}
> In YARN setup there are several sources where classes are loaded from: Flink lib directory, YARN lib directories, user code. The first two sources are handled by system classloader, the last one is loaded by FlinkUserCodeClassLoader.
> My streaming job parses Avro-encoded data using SpecificRecord facility. In essence, the job looks like this: Source -> Avro parser (Map) -> Sink. Parallelism is 1. Job operates inside a long-lived YARN session. I have a subclass of SpecificRecord, say it's name is MySpecificRecord. From class loading perspective, Avro library classes, including the SpecificRecord, are loaded by system class loader from YARN lib dir - such classes are shared across different Flink tasks within task manager. On the other side, MySpecificRecord is in the job fat jar, so it gets loaded by FlinkUserCodeClassLoader. Upon every job restart, task gets a new FlinkUserCodeClassLoader instance, so classes from user code are confined to a task instance.
> Simply put, the parsing itself looks like this:
> val bean = new SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)
> Now, the scenario:
> 1. I start my job. Parsing is initiated, so the SpecificDatumReader and SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader is instantiated, let's denote its instance as "A". MySpecificRecord then gets loaded by A.
> 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache that maps some string key derived from Avro schema to the implementing class. So during parsing I get MySpecificRecord (A) cached there.
> 3. I stop the job and re-submit it. The JVM process is the same, so all standard Avro classes, including SpecificData, remain loaded. A new task instance is created and gets a new FlinkUserCodeClassLoader instance, let's name it "B". A new MySpecificRecord class incarnation is loaded by B. From JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), even though their bytecode is identical.
> 4. The job starts parsing again. SpecificDatumReader consults SpecificData.INSTANCE's cache for any stashed classes and finds MySpecificRecord (A) there.
> 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a bean for filling the parsed data in.
> 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back to job.
> 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).
> 8. ClassCastException :^(
> I fixed the issue by not using the SpecificData.INSTANCE singleton (even though this is considered a common and expected practice). I feed every parser a new instance of SpecificData. This way the class cache is confined to a parser instance and gets recycled along with it.
> {quote}
> A discussion the the error can be found at:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)