You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hive.apache.org by "Marta Kuczora (JIRA)" <ji...@apache.org> on 2017/12/11 09:17:00 UTC

[jira] [Commented] (HIVE-18262) ConcurrentModificationException in QueryPlan.extractCounters

    [ https://issues.apache.org/jira/browse/HIVE-18262?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16285672#comment-16285672 ] 

Marta Kuczora commented on HIVE-18262:
--------------------------------------

Here is my guess about what could happen:
The exception is coming from the QueryPlan.extractCounters method
{code}
  private void extractCounters() throws IOException {
    Queue<Task<? extends Serializable>> tasksToVisit =
      new LinkedList<Task<? extends Serializable>>();
    Set<Task<? extends Serializable>> tasksVisited =
      new HashSet<Task<? extends Serializable>>();
    tasksToVisit.addAll(rootTasks);
    while (tasksToVisit.peek() != null) {
      Task<? extends Serializable> task = tasksToVisit.remove();
      tasksVisited.add(task);
      // add children to tasksToVisit
      if (task.getChildTasks() != null) {
        for (Task<? extends Serializable> childTask : task.getChildTasks()) {
          if (!tasksVisited.contains(childTask)) {
            tasksToVisit.add(childTask);
          }
        }
      }
      if (task.getId() == null) {
        continue;
      }
      if (started.contains(task.getId()) && done.contains(task.getId())) {
        continue;
      }

      // get the counters for the task
      counters.put(task.getId(), task.getCounters());

      // check if task is started
      if (task.started()) {
        started.add(task.getId());
      }
      if (task.done()) {
        done.add(task.getId());
      }
      if (task instanceof ExecDriver) {
        ExecDriver mrTask = (ExecDriver) task;
        if (mrTask.mapStarted()) {
          started.add(task.getId() + "_MAP");
        }
        if (mrTask.mapDone()) {
          done.add(task.getId() + "_MAP");
        }
        if (mrTask.hasReduce()) {
          if (mrTask.reduceStarted()) {
            started.add(task.getId() + "_REDUCE");
          }
          if (mrTask.reduceDone()) {
            done.add(task.getId() + "_REDUCE");
          }
        }
      } else if (task instanceof ConditionalTask) {
        ConditionalTask cTask = (ConditionalTask) task;
        for (Task<? extends Serializable> listTask : cTask.getListTasks()) {
          if (!tasksVisited.contains(listTask)) {
            tasksToVisit.add(listTask);
          }
        }
      }
    }
  }
{code}

This is the line where the exception occurs:
{code}
List<Task<? extends Serializable>> childTasks = task.getChildTasks();
{code}

This method will always go through the rootTasks which is an instance variable of the QueryPlan class. And this method is called from the Driver.launchTask method through these methods:
Driver.launchTask -> Utilities.setWorkflowAdjacencies -> plan.getQueryPlan -> extractCounters

In the Driver class we start different task runners for each tasks. (The number of parallel task runners depends on the hive.exec.parallel.thread.number config.) 
{code}
        while ((task = driverCxt.getRunnable(maxthreads)) != null) {
          TaskRunner runner = launchTask(task, queryId, noName, jobname, jobs, driverCxt);
          if (!runner.isRunning()) {
            break;
          }
        }
{code}
And each of these launchTask calls will also call the extractCounters method where the exception happens.
If a running task is a ConditionalTask, it can happen that the list of children tasks is modified, in the part I marked in the next code snippet:
{code}
  private void resolveTask(DriverContext driverContext) throws HiveException {
    for (Task<? extends Serializable> tsk : getListTasks()) {
      if (!resTasks.contains(tsk)) {
        driverContext.remove(tsk);
        console.printInfo(tsk.getId() + " is filtered out by condition resolver.");
        if (tsk.isMapRedTask()) {
          driverContext.incCurJobNo(1);
        }
        //recursively remove this task from its children's parent task
        tsk.removeFromChildrenTasks();
      } else {
>>>>>>>>>>>>>>>>>> THIS PART CAN MODIFY THE TASK'S CHILD LIST
        if (getParentTasks() != null) {
          // This makes it so that we can go back up the tree later
          for (Task<? extends Serializable> task : getParentTasks()) {
            task.addDependentTask(tsk);
          }
        }
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
        // resolved task
        if (driverContext.addToRunnable(tsk)) {
          console.printInfo(tsk.getId() + " is selected by condition resolver.");
        }
      }
    }
  }
{code}

As none of these methods or collections are thread safe, I think what happens is that a running ConditionalTask modifies the child list of the task in the moment when trying to iterate over it. 

> ConcurrentModificationException in QueryPlan.extractCounters
> ------------------------------------------------------------
>
>                 Key: HIVE-18262
>                 URL: https://issues.apache.org/jira/browse/HIVE-18262
>             Project: Hive
>          Issue Type: Bug
>          Components: HiveServer2
>    Affects Versions: 2.3.2
>            Reporter: Marta Kuczora
>
> Intermittently experiencing an issue where the query fails with the following error:
> {noformat}
> 2017-10-27 03:05:48,194 ERROR org.apache.hadoop.hive.ql.Driver: [HiveServer2-Handler-Pool: Thread-10925531]: FAILED: Hive Internal Error: java.util.ConcurrentModificationException(null)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:901)
> at java.util.ArrayList$Itr.next(ArrayList.java:851)
> at org.apache.hadoop.hive.ql.QueryPlan.extractCounters(QueryPlan.java:381)
> at org.apache.hadoop.hive.ql.QueryPlan.getQueryPlan(QueryPlan.java:436)
> at org.apache.hadoop.hive.ql.exec.Utilities.setWorkflowAdjacencies(Utilities.java:471)
> at org.apache.hadoop.hive.ql.Driver.launchTask(Driver.java:1950)
> at org.apache.hadoop.hive.ql.Driver.execute(Driver.java:1684)
> at org.apache.hadoop.hive.ql.Driver.runInternal(Driver.java:1421)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1205)
> at org.apache.hadoop.hive.ql.Driver.run(Driver.java:1200)
> at org.apache.hive.service.cli.operation.SQLOperation.runQuery(SQLOperation.java:187)
> at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:217)
> at org.apache.hive.service.cli.operation.Operation.run(Operation.java:337)
> at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:425)
> at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatement(HiveSessionImpl.java:396)
> at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
> at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
> at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1912)
> at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
> at com.sun.proxy.$Proxy19.executeStatement(Unknown Source)
> at org.apache.hive.service.cli.CLIService.executeStatement(CLIService.java:247)
> at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:500)
> at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1313)
> at org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1298)
> at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
> at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
> at org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:56)
> at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)