You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Robert Metzger (JIRA)" <ji...@apache.org> on 2015/09/08 18:39:46 UTC

[jira] [Comment Edited] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

    [ https://issues.apache.org/jira/browse/FLINK-2617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14735098#comment-14735098 ] 

Robert Metzger edited comment on FLINK-2617 at 9/8/15 4:39 PM:
---------------------------------------------------------------

Which version of HCatalog are you using?
(I'm asking because my argument list of the  HCatInputFormat.setInput() differs from yours)


was (Author: rmetzger):
Which version of HCatalog are you using?

> ConcurrentModificationException when using HCatRecordReader to access a hive table
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-2617
>                 URL: https://issues.apache.org/jira/browse/FLINK-2617
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>            Reporter: Arnaud Linz
>            Priority: Critical
>
> I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a {{ConcurrentModificationException}} in a copy method of a {{Configuration}} object that change during the copy.
> From what I understand, this object comes from {{TaskAttemptContext.getConfiguration()}} created by {{HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());}} 
> Maybe the {{job.Configuration}} object passed to the constructor of {{HadoopInputFormatBase}} should be cloned somewhere?
> Stack trace is :
> {code}
> org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
> org.apache.flink.client.program.Client.run(Client.java:413)
> org.apache.flink.client.program.Client.run(Client.java:356)
> org.apache.flink.client.program.Client.run(Client.java:349)
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
> com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
> com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
> com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
> com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
> com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:606)
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
> org.apache.flink.client.program.Client.run(Client.java:315)
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
> org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
> scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by:  java.util.ConcurrentModificationException
> java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
> java.util.HashMap$KeyIterator.next(HashMap.java:960)
> java.util.AbstractCollection.addAll(AbstractCollection.java:341)
> java.util.HashSet.<init>(HashSet.java:117)
> org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
> org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
> org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
> org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> java.lang.Thread.run(Thread.java:744)
> {code}
> Flink "user" code looks like:
> {code}
> import java.io.IOException;
> import java.io.Serializable;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.common.functions.RichFlatMapFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.io.FileOutputFormat;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.core.fs.FileSystem.WriteMode;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.streaming.api.functions.sink.SinkFunction;
> import org.apache.flink.util.Collector;
> import org.apache.hadoop.io.NullWritable;
> import org.apache.hadoop.io.compress.CompressionCodec;
> import org.apache.hadoop.mapreduce.InputFormat;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hive.hcatalog.data.DefaultHCatRecord;
> import org.apache.hive.hcatalog.data.schema.HCatSchema;
> import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
> (...) 
>         final Job job = Job.getInstance();
>         @SuppressWarnings({ "unchecked", "rawtypes" })
>         final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, 
>         DefaultHCatRecord>(
>             (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //
>             NullWritable.class, //
>             DefaultHCatRecord.class, //
>             job);
>         final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());
>         @SuppressWarnings("serial")
>         final DataSet<T> dataSet = cluster
>             .createInput(inputFormat)
>             .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {
>                 @Override
>                 public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception { // NOPMD
>                     final T record = createBean(value.f1, inputSchema);
>                     out.collect(record);
>                 }
>             }).returns(beanClass);
> (...)            
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)