You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Arnaud Linz (JIRA)" <ji...@apache.org> on 2015/09/03 15:11:46 UTC

[jira] [Created] (FLINK-2617) ConcurrentModificationException when using HCatRecordReader to access a hive table

Arnaud Linz created FLINK-2617:
----------------------------------

             Summary: ConcurrentModificationException when using HCatRecordReader to access a hive table
                 Key: FLINK-2617
                 URL: https://issues.apache.org/jira/browse/FLINK-2617
             Project: Flink
          Issue Type: Bug
          Components: Hadoop Compatibility
            Reporter: Arnaud Linz


I don't know if it's a hcat or a flink problem, but when reading a hive table in a cluster with many slots (20 threads per container), I systematically run into a {{ConcurrentModificationException}} in a copy method of a {{Configuration}} object that change during the copy.

>From what I understand, this object comes from {{TaskAttemptContext.getConfiguration()}} created by {{HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID());}} 

Maybe the {{job.Configuration}} object passed to the constructor of {{HadoopInputFormatBase}} should be cloned somewhere?

Stack trace is :
{code}
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
org.apache.flink.client.program.Client.run(Client.java:413)
org.apache.flink.client.program.Client.run(Client.java:356)
org.apache.flink.client.program.Client.run(Client.java:349)
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
com.bouygtel.kuberasdk.main.ApplicationBatch.execCluster(ApplicationBatch.java:73)
com.bouygtel.kubera.main.segment.ApplicationGeoSegment.batchExec(ApplicationGeoSegment.java:69)
com.bouygtel.kuberasdk.main.ApplicationBatch.exec(ApplicationBatch.java:50)
com.bouygtel.kuberasdk.main.Application.mainMethod(Application.java:88)
com.bouygtel.kubera.main.segment.MainGeoSmooth.main(MainGeoSmooth.java:44)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
org.apache.flink.client.program.Client.run(Client.java:315)
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
akka.actor.ActorCell.invoke(ActorCell.scala:487)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
akka.dispatch.Mailbox.run(Mailbox.scala:221)
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by:  java.util.ConcurrentModificationException
java.util.HashMap$HashIterator.nextEntry(HashMap.java:926)
java.util.HashMap$KeyIterator.next(HashMap.java:960)
java.util.AbstractCollection.addAll(AbstractCollection.java:341)
java.util.HashSet.<init>(HashSet.java:117)
org.apache.hadoop.conf.Configuration.<init>(Configuration.java:554)
org.apache.hadoop.mapred.JobConf.<init>(JobConf.java:439)
org.apache.hive.hcatalog.common.HCatUtil.getJobConfFromContext(HCatUtil.java:637)
org.apache.hive.hcatalog.mapreduce.HCatRecordReader.createBaseRecordReader(HCatRecordReader.java:112)
org.apache.hive.hcatalog.mapreduce.HCatRecordReader.initialize(HCatRecordReader.java:91)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:182)
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.open(HadoopInputFormatBase.java:56)
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
java.lang.Thread.run(Thread.java:744)
{code}

Flink "user" code looks like:
{code}

import java.io.IOException;
import java.io.Serializable;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.Collector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
import org.apache.hive.hcatalog.data.schema.HCatSchema;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;


(...) 
        final Job job = Job.getInstance();
        @SuppressWarnings({ "unchecked", "rawtypes" })
        final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat = new HadoopInputFormat<NullWritable, 
        DefaultHCatRecord>(
            (InputFormat) HCatInputFormat.setInput(job, dbName, tableName, filter), //
            NullWritable.class, //
            DefaultHCatRecord.class, //
            job);

        final HCatSchema inputSchema = HCatInputFormat.getTableSchema(job.getConfiguration());
        @SuppressWarnings("serial")
        final DataSet<T> dataSet = cluster
            .createInput(inputFormat)
            .flatMap(new FlatMapFunction<Tuple2<NullWritable, DefaultHCatRecord>, T>() {
                @Override
                public void flatMap(Tuple2<NullWritable, DefaultHCatRecord> value, Collector<T> out) throws Exception { // NOPMD
                    final T record = createBean(value.f1, inputSchema);
                    out.collect(record);
                }
            }).returns(beanClass);
(...)            
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)