You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "ani.desh1512" <an...@gmail.com> on 2017/01/25 16:10:02 UTC

Issues while restarting a job on HA cluster

1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through flink
cli. Then we cancel that running job. Then we do some changes in the source
code of jar, repackage it and deploy it again and run it again through cli.
The following error occurs:
               

/ java.lang.LinkageError: loader constraint violation: when resolving method
"com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
the class loader (instance of                                    
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of
the current class, com/mapr/fs/Inode, and the class loader (instance of
sun/misc/Launcher$ExtClassLoader) for the method's defining class,
com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at
com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at
com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at
com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at
com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)

01/24/2017 19:15:50     Job execution switched to status FAILING.
java.lang.LinkageError: loader constraint violation: when resolving method
"com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(JJLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/ByteBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
the class loader (instance of
org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader) of
the current class, com/mapr/fs/Inode, and the class loader (instance of
sun/misc/Launcher$ExtClassLoader) for the method's defining class,
com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
com/mapr/fs/jni/MapRUpdateAndGet used in the signature
        at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
        at
com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
        at
com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTableImpl.java:1736)
        at
com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
        at
com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
        at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
        at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
        at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:39)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:373)
        at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:358)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:346)
        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:329)
        at
org.apache.flink.streaming.api.operators.StreamSource$NonTimestampContext.collect(StreamSource.java:161)
        at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)/

    This error disappears when you restart the cluster. 
The jar is basically tasked with reading from mapr-streams and dumping the
messages into maprdb sink. Would this error be caused by some temp files
that do not get cleared when I cancel the job?
I checked this thread [  thread
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Diff-between-stop-and-cancel-job-td6697.html>  
]. It said stop is more gracious way of stopping jobs. But, I guess it is
not yet supported for Kafka source. I do get the following error when I try
to stop my job:

      /  java.lang.Exception: Stopping the job with ID
3bf393c79dc5597c1053fc934a0cfc44 failed.
     at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:525)
     at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1014)
     at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
Caused by: java.lang.IllegalStateException: Job with ID
3bf393c79dc5597c1053fc934a0cfc44 is not stoppable.
     at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:580)
     at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
     at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
     at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
     at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
     at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
     at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
     at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
     at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
     at akka.actor.ActorCell.invoke(ActorCell.scala:487)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
     at akka.dispatch.Mailbox.run(Mailbox.scala:221)
     at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)/

Hence, I had to resort to cancel a job.
Will I always need to restart my flink cluster in order to resolve this
error? Or am I missing some vital configuration? 



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-while-restarting-a-job-on-HA-cluster-tp11294.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues while restarting a job on HA cluster

Posted by "ani.desh1512" <an...@gmail.com>.
Hi Robert,
Thanks for the answer. 
My code does actually contain both mapr streams and maprdb jars. here are
the steps I followed based on your suggestion:
1. I copied only the mapr-streams-*.jar and maprdb*.jar.
2. Then I tried to run my jar, but i got java.lang.noclassdeffounderror for
some maprfs class.
3. I added maprfs*.jar to lib and tried submitting my jar again. 
4. This time I got java.lang.noclassdeffounderror for some hadoopfs class.
5. At this point I just created a sym link in lib folder to point to the
mapr lib folder, basically entailing that ALL the mapr related jars will be
deployed into the system classloader.
6. This previous step did the trick and I was able to get my job running.
Also, I have not yet encountered the error that I had earlier mentioned,
once I cancelled and resubmitted the job.

My only question is: Is this the expected behavior and normal solution? Do
we really need to add ALL the jars? I can possibly nitpick which jar to copy
by using dependency tree, but to do that for all the jobs feels cumbersome.
  



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issues-while-restarting-a-job-on-HA-cluster-tp11294p11332.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Issues while restarting a job on HA cluster

Posted by Robert Metzger <rm...@apache.org>.
Hi Ani,

This error is independent of cancel vs stop. Its an issue of loading the
MapR classes from the classloaders.

Do you user jars contain any MapR code (either mapr streams or maprdb)?

If so, I would recommend you to put these MapR libraries into the "lib/"
folder of Flink. They'll then be deployed into the system classloader of
the Flink JVMs.

Regards,
Robert


On Wed, Jan 25, 2017 at 5:10 PM, ani.desh1512 <an...@gmail.com>
wrote:

> 1. We have a HA cluster of 2 masters and 3 slaves. We run a jar through
> flink
> cli. Then we cancel that running job. Then we do some changes in the source
> code of jar, repackage it and deploy it again and run it again through cli.
> The following error occurs:
>
>
> / java.lang.LinkageError: loader constraint violation: when resolving
> method
> "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(J
> JLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/By
> teBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
> the class loader (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader)
> of
> the current class, com/mapr/fs/Inode, and the class loader (instance of
> sun/misc/Launcher$ExtClassLoader) for the method's defining class,
> com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
> com/mapr/fs/jni/MapRUpdateAndGet used in the signature
>         at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
>         at
> com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
>         at
> com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTabl
> eImpl.java:1736)
>         at
> com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
>         at
> com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
>         at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
>         at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
>         at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.processE
> lement(StreamSink.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:373)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:358)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$NonTim
> estampContext.collect(StreamSource.java:161)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.Abstra
> ctFetcher.emitRecord(AbstractFetcher.java:225)
>         at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09
> Fetcher.run(Kafka09Fetcher.java:253)
>         at java.lang.Thread.run(Thread.java:745)
>
> 01/24/2017 19:15:50     Job execution switched to status FAILING.
> java.lang.LinkageError: loader constraint violation: when resolving method
> "com.mapr.fs.jni.MapRTableTools.CheckAndReplaceOrDeleteRPC(J
> JLjava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;[I[BLjava/nio/By
> teBuffer;ZZLcom/mapr/fs/jni/MapRUpdateAndGet;)I"
> the class loader (instance of
> org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoader)
> of
> the current class, com/mapr/fs/Inode, and the class loader (instance of
> sun/misc/Launcher$ExtClassLoader) for the method's defining class,
> com/mapr/fs/jni/MapRTableTools, have different Class objects for the type
> com/mapr/fs/jni/MapRUpdateAndGet used in the signature
>         at com.mapr.fs.Inode.checkAndReplaceOrDelete(Inode.java:1777)
>         at
> com.mapr.fs.MapRHTable.checkAndReplaceOrDelete(MapRHTable.java:799)
>         at
> com.mapr.db.impl.MapRDBTableImpl._checkAndReplace(MapRDBTabl
> eImpl.java:1736)
>         at
> com.mapr.db.impl.MapRDBTableImpl._insert(MapRDBTableImpl.java:1366)
>         at
> com.mapr.db.impl.MapRDBTableImpl.insert(MapRDBTableImpl.java:1332)
>         at com.kabbage.utils.OJAIUtils.insert(OJAIUtils.java:93)
>         at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:31)
>         at com.kabbage.flink.MaprdbSink.invoke(MaprdbSink.java:10)
>         at
> org.apache.flink.streaming.api.operators.StreamSink.processE
> lement(StreamSink.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:373)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi
> ngChainingOutput.collect(OperatorChain.java:358)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:346)
>         at
> org.apache.flink.streaming.api.operators.AbstractStreamOpera
> tor$CountingOutput.collect(AbstractStreamOperator.java:329)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$NonTim
> estampContext.collect(StreamSource.java:161)
>         at
> org.apache.flink.streaming.connectors.kafka.internals.Abstra
> ctFetcher.emitRecord(AbstractFetcher.java:225)
>         at
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09
> Fetcher.run(Kafka09Fetcher.java:253)
>         at java.lang.Thread.run(Thread.java:745)/
>
>     This error disappears when you restart the cluster.
> The jar is basically tasked with reading from mapr-streams and dumping the
> messages into maprdb sink. Would this error be caused by some temp files
> that do not get cleared when I cancel the job?
> I checked this thread [  thread
> <http://apache-flink-user-mailing-list-archive.2336050.n4.
> nabble.com/Diff-between-stop-and-cancel-job-td6697.html>
> ]. It said stop is more gracious way of stopping jobs. But, I guess it is
> not yet supported for Kafka source. I do get the following error when I try
> to stop my job:
>
>       /  java.lang.Exception: Stopping the job with ID
> 3bf393c79dc5597c1053fc934a0cfc44 failed.
>      at org.apache.flink.client.CliFrontend.stop(CliFrontend.java:525)
>      at
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1014)
>      at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1048)
> Caused by: java.lang.IllegalStateException: Job with ID
> 3bf393c79dc5597c1053fc934a0cfc44 is not stoppable.
>      at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
> handleMessage$1.applyOrElse(JobManager.scala:580)
>      at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
> unction.scala:36)
>      at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun
> $receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>      at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
> unction.scala:36)
>      at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>      at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>      at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>      at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(
> LogMessages.scala:28)
>      at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>      at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive
> (JobManager.scala:121)
>      at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>      at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>      at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>      at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>      at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
> java:260)
>      at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
>      at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>      at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)/
>
> Hence, I had to resort to cancel a job.
> Will I always need to restart my flink cluster in order to resolve this
> error? Or am I missing some vital configuration?
>
>
>
> --
> View this message in context: http://apache-flink-user-maili
> ng-list-archive.2336050.n4.nabble.com/Issues-while-restar
> ting-a-job-on-HA-cluster-tp11294.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>