You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Marta Kuczora (JIRA)" <ji...@apache.org> on 2017/12/11 09:17:00 UTC
[jira] [Commented] (HIVE-18262) ConcurrentModificationException in
QueryPlan.extractCounters
[ https://issues.apache.org/jira/browse/HIVE-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285672#comment-16285672 ]
Marta Kuczora commented on HIVE-18262:
--------------------------------------
Here is my guess about what could happen:
The exception is coming from the QueryPlan.extractCounters method
{code}
private void extractCounters() throws IOException {
Queue<Task<? extends Serializable>> tasksToVisit =
new LinkedList<Task<? extends Serializable>>();
Set<Task<? extends Serializable>> tasksVisited =
new HashSet<Task<? extends Serializable>>();
tasksToVisit.addAll(rootTasks);
while (tasksToVisit.peek() != null) {
Task<? extends Serializable> task = tasksToVisit.remove();
tasksVisited.add(task);
// add children to tasksToVisit
if (task.getChildTasks() != null) {
for (Task<? extends Serializable> childTask : task.getChildTasks()) {
if (!tasksVisited.contains(childTask)) {
tasksToVisit.add(childTask);
}
}
}
if (task.getId() == null) {
continue;
}
if (started.contains(task.getId()) && done.contains(task.getId())) {
continue;
}
// get the counters for the task
counters.put(task.getId(), task.getCounters());
// check if task is started
if (task.started()) {
started.add(task.getId());
}
if (task.done()) {
done.add(task.getId());
}
if (task instanceof ExecDriver) {
ExecDriver mrTask = (ExecDriver) task;
if (mrTask.mapStarted()) {
started.add(task.getId() + "_MAP");
}
if (mrTask.mapDone()) {
done.add(task.getId() + "_MAP");
}
if (mrTask.hasReduce()) {
if (mrTask.reduceStarted()) {
started.add(task.getId() + "_REDUCE");
}
if (mrTask.reduceDone()) {
done.add(task.getId() + "_REDUCE");
}
}
} else if (task instanceof ConditionalTask) {
ConditionalTask cTask = (ConditionalTask) task;
for (Task<? extends Serializable> listTask : cTask.getListTasks()) {
if (!tasksVisited.contains(listTask)) {
tasksToVisit.add(listTask);
}
}
}
}
}
{code}
This is the line where the exception occurs:
{code}
List<Task<? extends Serializable>> childTasks = task.getChildTasks();
{code}
This method will always go through the rootTasks which is an instance variable of the QueryPlan class. And this method is called from the Driver.launchTask method through these methods:
Driver.launchTask -> Utilities.setWorkflowAdjacencies -> plan.getQueryPlan -> extractCounters
In the Driver class we start different task runners for each tasks. (The number of parallel task runners depends on the hive.exec.parallel.thread.number config.)
{code}
while ((task = driverCxt.getRunnable(maxthreads)) != null) {
TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
if (!runner.isRunning()) {
break;
}
}
{code}
And each of these launchTask calls will also call the extractCounters method where the exception happens.
If a running task is a ConditionalTask, it can happen that the list of children tasks is modified, in the part I marked in the next code snippet:
{code}
private void resolveTask(DriverContext driverContext) throws HiveException {
for (Task<? extends Serializable> tsk : getListTasks()) {
if (!resTasks.contains(tsk)) {
driverContext.remove(tsk);
console.printInfo(tsk.getId() + " is filtered out by condition resolver.");
if (tsk.isMapRedTask()) {
driverContext.incCurJobNo(1);
}
//recursively remove this task from its children's parent task
tsk.removeFromChildrenTasks();
} else {
>>>>>>>>>>>>>>>>>> THIS PART CAN MODIFY THE TASK'S CHILD LIST
if (getParentTasks() != null) {
// This makes it so that we can go back up the tree later
for (Task<? extends Serializable> task : getParentTasks()) {
task.addDependentTask(tsk);
}
}
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
// resolved task
if (driverContext.addToRunnable(tsk)) {
console.printInfo(tsk.getId() + " is selected by condition resolver.");
}
}
}
}
{code}
As none of these methods or collections are thread safe, I think what happens is that a running ConditionalTask modifies the child list of the task in the moment when trying to iterate over it.
> ConcurrentModificationException in QueryPlan.extractCounters
> ------------------------------------------------------------
>
> Key: HIVE-18262
> URL: https://issues.apache.org/jira/browse/HIVE-18262
> Project: Hive
> Issue Type: Bug
> Components: HiveServer2
> Affects Versions: 2.3.2
> Reporter: Marta Kuczora
>
> Intermittently experiencing an issue where the query fails with the following error:
> {noformat}
> 2017-10-27 03:05:48,194 ERROR org.apache.hadoop.hive.ql.Driver: [HiveServer2-Handler-Pool: Thread-10925531]: FAILED: Hive Internal Error: java.util.ConcurrentModificationException(null)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at org.apache.hadoop.hive.ql.QueryPlan.extractCounters(QueryPlan.java:381)
> at org.apache.hadoop.hive.ql.QueryPlan.getQueryPlan(QueryPlan.java:436)
> at org.apache.hadoop.hive.ql.exec.Utilities.setWorkflowAdjacencies(Utilities.java:471)
> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1950)
> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1684)
> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1421)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1205)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1200)
> at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:187)
> at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:217)
> at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
> at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:425)
> at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:396)
> at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
> at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
> at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1912)
> at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
> at com.sun.proxy.$Proxy19.executeStatement(Unknown Source)
> at org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:247)
> at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:500)
> at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
> at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
> at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
> at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
> at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
> at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)