You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Garrett Barton <ga...@gmail.com> on 2017/09/18 16:42:43 UTC

Classpath/ClassLoader issues

Hey all,

 I am trying out a POC with flink on yarn.  My simple goal is to read from
a Hive ORC table, process some data and write to a new Hive ORC table.

Currently I can get Flink to read the source table fine, both with using
The HCatalog Input format directly, and by using the flink-hcatalog
wrapper.  Processing the data also works fine. Dumping to console or a text
file also works fine.

I'm now stuck trying to write the data out, I'm getting
ClassNotFoundExceptions:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.
orc.OrcInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<init>(
FosterStorageHandler.68)
at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.
java:404)

Since I read from an Orc table, I know I have that class in my classpath.
So I'm wondering if each stage/step in a flink process has some kind of
special classloader that I am not aware of?  (also its odd that it wants
the inputformat and not the outputformat, not sure why yet)

My output code looks like this:


Job job = Job.getInstance(conf);

HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
"table",null));
HCatSchema outSchema = HCatOutputFormat.getTableSchema(job.
getConfiguration());
HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);

HCatOutputFormat outputFormat = new HCatOutputFormat();

HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
HadoopOutputFormat(outputFormat, job);

// from previous processing step
hcat.output(out);
env.execute("run");



One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
into the lib folder of the flink distro.  Building my code in a shaded jar
with that dependency did not work for me.  However when I put the hive/hcat
jars in the lib folder it caused lots of other errors.  Since the shading
didn't work for the hadoop-compatability jar it makes me think there is
some funky class loader stuff going on.  I don't understand why this doesnt
work.  The orc code is shaded and verified in my jar, the classes are
present, plus I successfully read from an ORC table.

Any help or explanation into how the classpath/classloading works would be
wonderful!

Re: Classpath/ClassLoader issues

Posted by Fabian Hueske <fh...@gmail.com>.
Thanks for the feedback Garrett!
Good to know that this fixes the problem.

The patch will be included in the next releases.

Best, Fabian

2017-10-06 20:31 GMT+02:00 Garrett Barton <ga...@gmail.com>:

> Fabian,
>
>  Just to follow up on this, I took the patch, compiled that class and
> stuck it into the existing 1.3.2 jar and all is well. (I couldn't get all
> of flink to build correctly)
>
> Thank you!
>
> On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton <ga...@gmail.com>
> wrote:
>
>> Fabian,
>>  Awesome!  After your initial email I got things to work by deploying my
>> fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
>> your pull request and give it a go tomorrow.
>>
>> On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fh...@gmail.com> wrote:
>>
>>> Here's the pull request that hopefully fixes your issue:
>>> https://github.com/apache/flink/pull/4690
>>>
>>> Best, Fabian
>>>
>>> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>>>
>>>> Hi Garrett,
>>>>
>>>> I think I identified the problem.
>>>> You said you put the Hive/HCat dependencies into your user fat Jar,
>>>> correct? In this case, they are loaded with Flink's userClassLoader (as
>>>> described before).
>>>>
>>>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>>>> loads the user classes with the user class loader.
>>>> However, when the HCatOutputFormat.getOutputCommitter() method is
>>>> called, Hive tries to load additional classes with the current thread class
>>>> loader (see at org.apache.hadoop.hive.common.
>>>> JavaUtils.loadClass(JavaUtils.java:78)).
>>>> This behavior is actually OK, because we usually set the context
>>>> classloader to be the user classloader before calling user code. However,
>>>> this has not been done here.
>>>> So, this is in fact a bug.
>>>>
>>>> I created this JIRA issue: https://issues.apache.org/jira
>>>> /browse/FLINK-7656 and will open a PR for that.
>>>>
>>>> Thanks for helping to diagnose the issue,
>>>> Fabian
>>>>
>>>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>>
>>>>> Fabian,
>>>>>
>>>>>  It looks like hive instantiates both input and output formats when
>>>>> doing either. I use hive 1.2.1, and you can see in
>>>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>>>> happening after the writes complete and flink is in the finish/finalize
>>>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>>>> mark finished along with bytes sent and records sent being exactly what I
>>>>> expect them to be.  The first error also mentions the master, is this the
>>>>> flink jobmanager process then?
>>>>>
>>>>> The expanded stacktrace is:
>>>>>
>>>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>>> xFinished(ExecutionGraph.java:1325)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>>>> utionFinished(ExecutionVertex.java:688)
>>>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>>>> ed(Execution.java:797)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>>>> eState(ExecutionGraph.java:1477)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>>> ... 8 more
>>>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>>>> load foster storage handler
>>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>>>> eOnMaster(OutputFormatVertex.java:118)
>>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>>> xFinished(ExecutionGraph.java:1320)
>>>>> ... 14 more
>>>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:409)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:367)
>>>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>>>> utputFormat(HCatBaseOutputFormat.java:77)
>>>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>>>> tCommitter(HCatOutputFormat.java:275)
>>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>>>> ... 16 more
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:348)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>> java:78)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>> t>(FosterStorageHandler.68)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:404)
>>>>>
>>>>>
>>>>> Thank you all for any help. :)
>>>>>
>>>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Garrett,
>>>>>>
>>>>>> Flink distinguishes between two classloaders: 1) the system
>>>>>> classloader which is the main classloader of the process. This classloader
>>>>>> loads all jars in the ./lib folder and 2) the user classloader which loads
>>>>>> the job jar.
>>>>>> AFAIK, the different operators do not have distinct classloaders. So,
>>>>>> in principle all operators should use the same user classloader.
>>>>>>
>>>>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>>>>> found when you try to emit to an ORC file.
>>>>>> This looks suspicious because I would rather expect the
>>>>>> OrcOutputFormat to be the problem than the input format.
>>>>>> Can you post more of the stacktrace? This would help to identify the
>>>>>> spot in the Flink code where the exception is thrown.
>>>>>>
>>>>>> Thanks, Fabian
>>>>>>
>>>>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>>>>
>>>>>>> Hey all,
>>>>>>>
>>>>>>>  I am trying out a POC with flink on yarn.  My simple goal is to
>>>>>>> read from a Hive ORC table, process some data and write to a new Hive ORC
>>>>>>> table.
>>>>>>>
>>>>>>> Currently I can get Flink to read the source table fine, both with
>>>>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>>>>> file also works fine.
>>>>>>>
>>>>>>> I'm now stuck trying to write the data out, I'm getting
>>>>>>> ClassNotFoundExceptions:
>>>>>>>
>>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>>> at java.lang.Class.forName0(Native Method)
>>>>>>> at java.lang.Class.forName(Class.java:348)
>>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>>>> java:78)
>>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>>>> t>(FosterStorageHandler.68)
>>>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>>>> CatUtil.java:404)
>>>>>>>
>>>>>>> Since I read from an Orc table, I know I have that class in my
>>>>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>>>>
>>>>>>> My output code looks like this:
>>>>>>>
>>>>>>>
>>>>>>> Job job = Job.getInstance(conf);
>>>>>>>
>>>>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>>>>> "table",null));
>>>>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>>>>> a(job.getConfiguration());
>>>>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>>>>
>>>>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>>>>
>>>>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>>>>> HadoopOutputFormat(outputFormat, job);
>>>>>>>
>>>>>>> // from previous processing step
>>>>>>> hcat.output(out);
>>>>>>> env.execute("run");
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>>>>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>>>>> present, plus I successfully read from an ORC table.
>>>>>>>
>>>>>>> Any help or explanation into how the classpath/classloading works
>>>>>>> would be wonderful!
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Classpath/ClassLoader issues

Posted by Garrett Barton <ga...@gmail.com>.
Fabian,

 Just to follow up on this, I took the patch, compiled that class and stuck
it into the existing 1.3.2 jar and all is well. (I couldn't get all of
flink to build correctly)

Thank you!

On Wed, Sep 20, 2017 at 3:53 PM, Garrett Barton <ga...@gmail.com>
wrote:

> Fabian,
>  Awesome!  After your initial email I got things to work by deploying my
> fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
> your pull request and give it a go tomorrow.
>
> On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Here's the pull request that hopefully fixes your issue:
>> https://github.com/apache/flink/pull/4690
>>
>> Best, Fabian
>>
>> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>>
>>> Hi Garrett,
>>>
>>> I think I identified the problem.
>>> You said you put the Hive/HCat dependencies into your user fat Jar,
>>> correct? In this case, they are loaded with Flink's userClassLoader (as
>>> described before).
>>>
>>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>>> loads the user classes with the user class loader.
>>> However, when the HCatOutputFormat.getOutputCommitter() method is
>>> called, Hive tries to load additional classes with the current thread class
>>> loader (see at org.apache.hadoop.hive.common.
>>> JavaUtils.loadClass(JavaUtils.java:78)).
>>> This behavior is actually OK, because we usually set the context
>>> classloader to be the user classloader before calling user code. However,
>>> this has not been done here.
>>> So, this is in fact a bug.
>>>
>>> I created this JIRA issue: https://issues.apache.org/jira
>>> /browse/FLINK-7656 and will open a PR for that.
>>>
>>> Thanks for helping to diagnose the issue,
>>> Fabian
>>>
>>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>
>>>> Fabian,
>>>>
>>>>  It looks like hive instantiates both input and output formats when
>>>> doing either. I use hive 1.2.1, and you can see in
>>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>>> happening after the writes complete and flink is in the finish/finalize
>>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>>> mark finished along with bytes sent and records sent being exactly what I
>>>> expect them to be.  The first error also mentions the master, is this the
>>>> flink jobmanager process then?
>>>>
>>>> The expanded stacktrace is:
>>>>
>>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>> xFinished(ExecutionGraph.java:1325)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>>> utionFinished(ExecutionVertex.java:688)
>>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>>> ed(Execution.java:797)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>>> eState(ExecutionGraph.java:1477)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>>> ... 8 more
>>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>>> load foster storage handler
>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>>> eOnMaster(OutputFormatVertex.java:118)
>>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>>> xFinished(ExecutionGraph.java:1320)
>>>> ... 14 more
>>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:409)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:367)
>>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>>> utputFormat(HCatBaseOutputFormat.java:77)
>>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>>> tCommitter(HCatOutputFormat.java:275)
>>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>>> ... 16 more
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>> t>(FosterStorageHandler.68)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:404)
>>>>
>>>>
>>>> Thank you all for any help. :)
>>>>
>>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Garrett,
>>>>>
>>>>> Flink distinguishes between two classloaders: 1) the system
>>>>> classloader which is the main classloader of the process. This classloader
>>>>> loads all jars in the ./lib folder and 2) the user classloader which loads
>>>>> the job jar.
>>>>> AFAIK, the different operators do not have distinct classloaders. So,
>>>>> in principle all operators should use the same user classloader.
>>>>>
>>>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>>>> found when you try to emit to an ORC file.
>>>>> This looks suspicious because I would rather expect the
>>>>> OrcOutputFormat to be the problem than the input format.
>>>>> Can you post more of the stacktrace? This would help to identify the
>>>>> spot in the Flink code where the exception is thrown.
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>>>
>>>>>> Hey all,
>>>>>>
>>>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>>>>
>>>>>> Currently I can get Flink to read the source table fine, both with
>>>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>>>> file also works fine.
>>>>>>
>>>>>> I'm now stuck trying to write the data out, I'm getting
>>>>>> ClassNotFoundExceptions:
>>>>>>
>>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>> at java.lang.Class.forName0(Native Method)
>>>>>> at java.lang.Class.forName(Class.java:348)
>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>>> java:78)
>>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>>> t>(FosterStorageHandler.68)
>>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>>> CatUtil.java:404)
>>>>>>
>>>>>> Since I read from an Orc table, I know I have that class in my
>>>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>>>
>>>>>> My output code looks like this:
>>>>>>
>>>>>>
>>>>>> Job job = Job.getInstance(conf);
>>>>>>
>>>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>>>> "table",null));
>>>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>>>> a(job.getConfiguration());
>>>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>>>
>>>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>>>
>>>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>>>> HadoopOutputFormat(outputFormat, job);
>>>>>>
>>>>>> // from previous processing step
>>>>>> hcat.output(out);
>>>>>> env.execute("run");
>>>>>>
>>>>>>
>>>>>>
>>>>>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>>>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>>>> present, plus I successfully read from an ORC table.
>>>>>>
>>>>>> Any help or explanation into how the classpath/classloading works
>>>>>> would be wonderful!
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Classpath/ClassLoader issues

Posted by Garrett Barton <ga...@gmail.com>.
Fabian,
 Awesome!  After your initial email I got things to work by deploying my
fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
your pull request and give it a go tomorrow.

On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Here's the pull request that hopefully fixes your issue:
> https://github.com/apache/flink/pull/4690
>
> Best, Fabian
>
> 2017-09-20 16:15 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Garrett,
>>
>> I think I identified the problem.
>> You said you put the Hive/HCat dependencies into your user fat Jar,
>> correct? In this case, they are loaded with Flink's userClassLoader (as
>> described before).
>>
>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>> loads the user classes with the user class loader.
>> However, when the HCatOutputFormat.getOutputCommitter() method is
>> called, Hive tries to load additional classes with the current thread class
>> loader (see at org.apache.hadoop.hive.common.
>> JavaUtils.loadClass(JavaUtils.java:78)).
>> This behavior is actually OK, because we usually set the context
>> classloader to be the user classloader before calling user code. However,
>> this has not been done here.
>> So, this is in fact a bug.
>>
>> I created this JIRA issue: https://issues.apache.org/jira
>> /browse/FLINK-7656 and will open a PR for that.
>>
>> Thanks for helping to diagnose the issue,
>> Fabian
>>
>> 2017-09-19 22:05 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>
>>> Fabian,
>>>
>>>  It looks like hive instantiates both input and output formats when
>>> doing either. I use hive 1.2.1, and you can see in
>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>> happening after the writes complete and flink is in the finish/finalize
>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>> mark finished along with bytes sent and records sent being exactly what I
>>> expect them to be.  The first error also mentions the master, is this the
>>> flink jobmanager process then?
>>>
>>> The expanded stacktrace is:
>>>
>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1325)
>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>> utionFinished(ExecutionVertex.java:688)
>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:797)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1477)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> ... 8 more
>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>> load foster storage handler
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>> eOnMaster(OutputFormatVertex.java:118)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1320)
>>> ... 14 more
>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:409)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:367)
>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>> utputFormat(HCatBaseOutputFormat.java:77)
>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>> tCommitter(HCatOutputFormat.java:275)
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>> ... 16 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>> t>(FosterStorageHandler.68)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:404)
>>>
>>>
>>> Thank you all for any help. :)
>>>
>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com>
>>> wrote:
>>>
>>>> Hi Garrett,
>>>>
>>>> Flink distinguishes between two classloaders: 1) the system classloader
>>>> which is the main classloader of the process. This classloader loads all
>>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>>> jar.
>>>> AFAIK, the different operators do not have distinct classloaders. So,
>>>> in principle all operators should use the same user classloader.
>>>>
>>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>>> found when you try to emit to an ORC file.
>>>> This looks suspicious because I would rather expect the OrcOutputFormat
>>>> to be the problem than the input format.
>>>> Can you post more of the stacktrace? This would help to identify the
>>>> spot in the Flink code where the exception is thrown.
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>>
>>>>> Hey all,
>>>>>
>>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>>>
>>>>> Currently I can get Flink to read the source table fine, both with
>>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>>> file also works fine.
>>>>>
>>>>> I'm now stuck trying to write the data out, I'm getting
>>>>> ClassNotFoundExceptions:
>>>>>
>>>>> Caused by: java.lang.ClassNotFoundException:
>>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> at java.lang.Class.forName0(Native Method)
>>>>> at java.lang.Class.forName(Class.java:348)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
>>>>> java:78)
>>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>>> t>(FosterStorageHandler.68)
>>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>>> CatUtil.java:404)
>>>>>
>>>>> Since I read from an Orc table, I know I have that class in my
>>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>>
>>>>> My output code looks like this:
>>>>>
>>>>>
>>>>> Job job = Job.getInstance(conf);
>>>>>
>>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>>> "table",null));
>>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>>> a(job.getConfiguration());
>>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>>
>>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>>
>>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>>> HadoopOutputFormat(outputFormat, job);
>>>>>
>>>>> // from previous processing step
>>>>> hcat.output(out);
>>>>> env.execute("run");
>>>>>
>>>>>
>>>>>
>>>>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>>> present, plus I successfully read from an ORC table.
>>>>>
>>>>> Any help or explanation into how the classpath/classloading works
>>>>> would be wonderful!
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Classpath/ClassLoader issues

Posted by Fabian Hueske <fh...@gmail.com>.
Here's the pull request that hopefully fixes your issue:
https://github.com/apache/flink/pull/4690

Best, Fabian

2017-09-20 16:15 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi Garrett,
>
> I think I identified the problem.
> You said you put the Hive/HCat dependencies into your user fat Jar,
> correct? In this case, they are loaded with Flink's userClassLoader (as
> described before).
>
> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
> loads the user classes with the user class loader.
> However, when the HCatOutputFormat.getOutputCommitter() method is called,
> Hive tries to load additional classes with the current thread class loader
> (see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
> java:78)).
> This behavior is actually OK, because we usually set the context
> classloader to be the user classloader before calling user code. However,
> this has not been done here.
> So, this is in fact a bug.
>
> I created this JIRA issue: https://issues.apache.org/
> jira/browse/FLINK-7656 and will open a PR for that.
>
> Thanks for helping to diagnose the issue,
> Fabian
>
> 2017-09-19 22:05 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>
>> Fabian,
>>
>>  It looks like hive instantiates both input and output formats when doing
>> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
>> where it tries to load both.  It looks like its happening after the writes
>> complete and flink is in the finish/finalize stage.  When I watch the
>> counters in the Flink ui, i see all output tasks mark finished along with
>> bytes sent and records sent being exactly what I expect them to be.  The
>> first error also mentions the master, is this the flink jobmanager process
>> then?
>>
>> The expanded stacktrace is:
>>
>> Caused by: java.lang.Exception: Failed to finalize execution on master
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1325)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>> utionFinished(ExecutionVertex.java:688)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:797)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1477)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> ... 8 more
>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>> load foster storage handler
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>> eOnMaster(OutputFormatVertex.java:118)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1320)
>> ... 14 more
>> Caused by: java.io.IOException: Failed to load foster storage handler
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:409)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:367)
>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>> utputFormat(HCatBaseOutputFormat.java:77)
>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>> tCommitter(HCatOutputFormat.java:275)
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>> ... 16 more
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>> t>(FosterStorageHandler.68)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>> CatUtil.java:404)
>>
>>
>> Thank you all for any help. :)
>>
>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com>
>> wrote:
>>
>>> Hi Garrett,
>>>
>>> Flink distinguishes between two classloaders: 1) the system classloader
>>> which is the main classloader of the process. This classloader loads all
>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>> jar.
>>> AFAIK, the different operators do not have distinct classloaders. So, in
>>> principle all operators should use the same user classloader.
>>>
>>> According to the stacktrace you posted, the OrcInputFormat cannot be
>>> found when you try to emit to an ORC file.
>>> This looks suspicious because I would rather expect the OrcOutputFormat
>>> to be the problem than the input format.
>>> Can you post more of the stacktrace? This would help to identify the
>>> spot in the Flink code where the exception is thrown.
>>>
>>> Thanks, Fabian
>>>
>>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>>
>>>> Hey all,
>>>>
>>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>>
>>>> Currently I can get Flink to read the source table fine, both with
>>>> using The HCatalog Input format directly, and by using the flink-hcatalog
>>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>>> file also works fine.
>>>>
>>>> I'm now stuck trying to write the data out, I'm getting
>>>> ClassNotFoundExceptions:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>>> t>(FosterStorageHandler.68)
>>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>>> CatUtil.java:404)
>>>>
>>>> Since I read from an Orc table, I know I have that class in my
>>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>>> kind of special classloader that I am not aware of?  (also its odd that it
>>>> wants the inputformat and not the outputformat, not sure why yet)
>>>>
>>>> My output code looks like this:
>>>>
>>>>
>>>> Job job = Job.getInstance(conf);
>>>>
>>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>>> "table",null));
>>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>>> a(job.getConfiguration());
>>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>>
>>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>>
>>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>>> HadoopOutputFormat(outputFormat, job);
>>>>
>>>> // from previous processing step
>>>> hcat.output(out);
>>>> env.execute("run");
>>>>
>>>>
>>>>
>>>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>>> with that dependency did not work for me.  However when I put the hive/hcat
>>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>>> didn't work for the hadoop-compatability jar it makes me think there is
>>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>>> work.  The orc code is shaded and verified in my jar, the classes are
>>>> present, plus I successfully read from an ORC table.
>>>>
>>>> Any help or explanation into how the classpath/classloading works would
>>>> be wonderful!
>>>>
>>>
>>>
>>
>

Re: Classpath/ClassLoader issues

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Garrett,

I think I identified the problem.
You said you put the Hive/HCat dependencies into your user fat Jar,
correct? In this case, they are loaded with Flink's userClassLoader (as
described before).

In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly loads
the user classes with the user class loader.
However, when the HCatOutputFormat.getOutputCommitter() method is called,
Hive tries to load additional classes with the current thread class loader
(see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
java:78)).
This behavior is actually OK, because we usually set the context
classloader to be the user classloader before calling user code. However,
this has not been done here.
So, this is in fact a bug.

I created this JIRA issue: https://issues.apache.org/jira/browse/FLINK-7656
and will open a PR for that.

Thanks for helping to diagnose the issue,
Fabian

2017-09-19 22:05 GMT+02:00 Garrett Barton <ga...@gmail.com>:

> Fabian,
>
>  It looks like hive instantiates both input and output formats when doing
> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
> where it tries to load both.  It looks like its happening after the writes
> complete and flink is in the finish/finalize stage.  When I watch the
> counters in the Flink ui, i see all output tasks mark finished along with
> bytes sent and records sent being exactly what I expect them to be.  The
> first error also mentions the master, is this the flink jobmanager process
> then?
>
> The expanded stacktrace is:
>
> Caused by: java.lang.Exception: Failed to finalize execution on master
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> vertexFinished(ExecutionGraph.java:1325)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.
> executionFinished(ExecutionVertex.java:688)
> at org.apache.flink.runtime.executiongraph.Execution.
> markFinished(Execution.java:797)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> updateState(ExecutionGraph.java:1477)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$
> mcV$sp(JobManager.scala:710)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> ... 8 more
> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load
> foster storage handler
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.
> finalizeGlobal(HadoopOutputFormatBase.java:202)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(
> OutputFormatVertex.java:118)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> vertexFinished(ExecutionGraph.java:1320)
> ... 14 more
> Caused by: java.io.IOException: Failed to load foster storage handler
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.
> java:409)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.
> java:367)
> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.
> getOutputFormat(HCatBaseOutputFormat.java:77)
> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(
> HCatOutputFormat.java:275)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.
> finalizeGlobal(HadoopOutputFormatBase.java:200)
> ... 16 more
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
> t>(FosterStorageHandler.68)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
> CatUtil.java:404)
>
>
> Thank you all for any help. :)
>
> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Garrett,
>>
>> Flink distinguishes between two classloaders: 1) the system classloader
>> which is the main classloader of the process. This classloader loads all
>> jars in the ./lib folder and 2) the user classloader which loads the job
>> jar.
>> AFAIK, the different operators do not have distinct classloaders. So, in
>> principle all operators should use the same user classloader.
>>
>> According to the stacktrace you posted, the OrcInputFormat cannot be
>> found when you try to emit to an ORC file.
>> This looks suspicious because I would rather expect the OrcOutputFormat
>> to be the problem than the input format.
>> Can you post more of the stacktrace? This would help to identify the spot
>> in the Flink code where the exception is thrown.
>>
>> Thanks, Fabian
>>
>> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>>
>>> Hey all,
>>>
>>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>>
>>> Currently I can get Flink to read the source table fine, both with using
>>> The HCatalog Input format directly, and by using the flink-hcatalog
>>> wrapper.  Processing the data also works fine. Dumping to console or a text
>>> file also works fine.
>>>
>>> I'm now stuck trying to write the data out, I'm getting
>>> ClassNotFoundExceptions:
>>>
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>>> t>(FosterStorageHandler.68)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:404)
>>>
>>> Since I read from an Orc table, I know I have that class in my
>>> classpath.  So I'm wondering if each stage/step in a flink process has some
>>> kind of special classloader that I am not aware of?  (also its odd that it
>>> wants the inputformat and not the outputformat, not sure why yet)
>>>
>>> My output code looks like this:
>>>
>>>
>>> Job job = Job.getInstance(conf);
>>>
>>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>>> "table",null));
>>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>>> a(job.getConfiguration());
>>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>>
>>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>>
>>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>>> HadoopOutputFormat(outputFormat, job);
>>>
>>> // from previous processing step
>>> hcat.output(out);
>>> env.execute("run");
>>>
>>>
>>>
>>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>>> into the lib folder of the flink distro.  Building my code in a shaded jar
>>> with that dependency did not work for me.  However when I put the hive/hcat
>>> jars in the lib folder it caused lots of other errors.  Since the shading
>>> didn't work for the hadoop-compatability jar it makes me think there is
>>> some funky class loader stuff going on.  I don't understand why this doesnt
>>> work.  The orc code is shaded and verified in my jar, the classes are
>>> present, plus I successfully read from an ORC table.
>>>
>>> Any help or explanation into how the classpath/classloading works would
>>> be wonderful!
>>>
>>
>>
>

Re: Classpath/ClassLoader issues

Posted by Garrett Barton <ga...@gmail.com>.
Fabian,

 It looks like hive instantiates both input and output formats when doing
either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
where it tries to load both.  It looks like its happening after the writes
complete and flink is in the finish/finalize stage.  When I watch the
counters in the Flink ui, i see all output tasks mark finished along with
bytes sent and records sent being exactly what I expect them to be.  The
first error also mentions the master, is this the flink jobmanager process
then?

The expanded stacktrace is:

Caused by: java.lang.Exception: Failed to finalize execution on master
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1325)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:688)
at
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:797)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1477)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
... 8 more
Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load
foster storage handler
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(OutputFormatVertex.java:118)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1320)
... 14 more
Caused by: java.io.IOException: Failed to load foster storage handler
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:409)
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:367)
at
org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:77)
at
org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(HCatOutputFormat.java:275)
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.o
rc.OrcInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
init>(FosterStorageHandler.68)
at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
HCatUtil.java:404)


Thank you all for any help. :)

On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Garrett,
>
> Flink distinguishes between two classloaders: 1) the system classloader
> which is the main classloader of the process. This classloader loads all
> jars in the ./lib folder and 2) the user classloader which loads the job
> jar.
> AFAIK, the different operators do not have distinct classloaders. So, in
> principle all operators should use the same user classloader.
>
> According to the stacktrace you posted, the OrcInputFormat cannot be found
> when you try to emit to an ORC file.
> This looks suspicious because I would rather expect the OrcOutputFormat to
> be the problem than the input format.
> Can you post more of the stacktrace? This would help to identify the spot
> in the Flink code where the exception is thrown.
>
> Thanks, Fabian
>
> 2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:
>
>> Hey all,
>>
>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>
>> Currently I can get Flink to read the source table fine, both with using
>> The HCatalog Input format directly, and by using the flink-hcatalog
>> wrapper.  Processing the data also works fine. Dumping to console or a text
>> file also works fine.
>>
>> I'm now stuck trying to write the data out, I'm getting
>> ClassNotFoundExceptions:
>>
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<ini
>> t>(FosterStorageHandler.68)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>> CatUtil.java:404)
>>
>> Since I read from an Orc table, I know I have that class in my
>> classpath.  So I'm wondering if each stage/step in a flink process has some
>> kind of special classloader that I am not aware of?  (also its odd that it
>> wants the inputformat and not the outputformat, not sure why yet)
>>
>> My output code looks like this:
>>
>>
>> Job job = Job.getInstance(conf);
>>
>> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
>> "table",null));
>> HCatSchema outSchema = HCatOutputFormat.getTableSchem
>> a(job.getConfiguration());
>> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>>
>> HCatOutputFormat outputFormat = new HCatOutputFormat();
>>
>> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
>> HadoopOutputFormat(outputFormat, job);
>>
>> // from previous processing step
>> hcat.output(out);
>> env.execute("run");
>>
>>
>>
>> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
>> into the lib folder of the flink distro.  Building my code in a shaded jar
>> with that dependency did not work for me.  However when I put the hive/hcat
>> jars in the lib folder it caused lots of other errors.  Since the shading
>> didn't work for the hadoop-compatability jar it makes me think there is
>> some funky class loader stuff going on.  I don't understand why this doesnt
>> work.  The orc code is shaded and verified in my jar, the classes are
>> present, plus I successfully read from an ORC table.
>>
>> Any help or explanation into how the classpath/classloading works would
>> be wonderful!
>>
>
>

Re: Classpath/ClassLoader issues

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Garrett,

Flink distinguishes between two classloaders: 1) the system classloader
which is the main classloader of the process. This classloader loads all
jars in the ./lib folder and 2) the user classloader which loads the job
jar.
AFAIK, the different operators do not have distinct classloaders. So, in
principle all operators should use the same user classloader.

According to the stacktrace you posted, the OrcInputFormat cannot be found
when you try to emit to an ORC file.
This looks suspicious because I would rather expect the OrcOutputFormat to
be the problem than the input format.
Can you post more of the stacktrace? This would help to identify the spot
in the Flink code where the exception is thrown.

Thanks, Fabian

2017-09-18 18:42 GMT+02:00 Garrett Barton <ga...@gmail.com>:

> Hey all,
>
>  I am trying out a POC with flink on yarn.  My simple goal is to read from
> a Hive ORC table, process some data and write to a new Hive ORC table.
>
> Currently I can get Flink to read the source table fine, both with using
> The HCatalog Input format directly, and by using the flink-hcatalog
> wrapper.  Processing the data also works fine. Dumping to console or a text
> file also works fine.
>
> I'm now stuck trying to write the data out, I'm getting
> ClassNotFoundExceptions:
>
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
> init>(FosterStorageHandler.68)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
> HCatUtil.java:404)
>
> Since I read from an Orc table, I know I have that class in my classpath.
> So I'm wondering if each stage/step in a flink process has some kind of
> special classloader that I am not aware of?  (also its odd that it wants
> the inputformat and not the outputformat, not sure why yet)
>
> My output code looks like this:
>
>
> Job job = Job.getInstance(conf);
>
> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
> "table",null));
> HCatSchema outSchema = HCatOutputFormat.getTableSchem
> a(job.getConfiguration());
> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>
> HCatOutputFormat outputFormat = new HCatOutputFormat();
>
> HadoopOutputFormat<NullWritable, DefaultHCatRecord> out = new
> HadoopOutputFormat(outputFormat, job);
>
> // from previous processing step
> hcat.output(out);
> env.execute("run");
>
>
>
> One other thing to note, I had to put flink-hadoop-compatability_2.11-1.3.2.jar
> into the lib folder of the flink distro.  Building my code in a shaded jar
> with that dependency did not work for me.  However when I put the hive/hcat
> jars in the lib folder it caused lots of other errors.  Since the shading
> didn't work for the hadoop-compatability jar it makes me think there is
> some funky class loader stuff going on.  I don't understand why this doesnt
> work.  The orc code is shaded and verified in my jar, the classes are
> present, plus I successfully read from an ORC table.
>
> Any help or explanation into how the classpath/classloading works would be
> wonderful!
>