You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@hive.apache.org by "Vaibhav Gumashta (JIRA)" <ji...@apache.org> on 2017/06/23 20:31:00 UTC

[jira] [Created] (HIVE-16951) ACID: Compactor doesn't close JobClient

Vaibhav Gumashta created HIVE-16951:
---------------------------------------

             Summary: ACID: Compactor doesn't close JobClient
                 Key: HIVE-16951
                 URL: https://issues.apache.org/jira/browse/HIVE-16951
             Project: Hive
          Issue Type: Bug
          Components: Transactions
    Affects Versions: 2.1.1, 1.2.2
            Reporter: Vaibhav Gumashta


When a compaction job is launched, we create a new JobClient everytime we run the MR job:
{code}
  private void launchCompactionJob(JobConf job, Path baseDir, CompactionType compactionType,
                                   StringableList dirsToSearch,
                                   List<AcidUtils.ParsedDelta> parsedDeltas,
                                   int curDirNumber, int obsoleteDirNumber, HiveConf hiveConf,
                                   TxnStore txnHandler, long id, String jobName) throws IOException {
    job.setBoolean(IS_MAJOR, compactionType == CompactionType.MAJOR);
    if(dirsToSearch == null) {
      dirsToSearch = new StringableList();
    }
    StringableList deltaDirs = new StringableList();
    long minTxn = Long.MAX_VALUE;
    long maxTxn = Long.MIN_VALUE;
    for (AcidUtils.ParsedDelta delta : parsedDeltas) {
      LOG.debug("Adding delta " + delta.getPath() + " to directories to search");
      dirsToSearch.add(delta.getPath());
      deltaDirs.add(delta.getPath());
      minTxn = Math.min(minTxn, delta.getMinTransaction());
      maxTxn = Math.max(maxTxn, delta.getMaxTransaction());
    }

    if (baseDir != null) job.set(BASE_DIR, baseDir.toString());
    job.set(DELTA_DIRS, deltaDirs.toString());
    job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
    job.setLong(MIN_TXN, minTxn);
    job.setLong(MAX_TXN, maxTxn);

    if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
      mrJob = job;
    }

    LOG.info("Submitting " + compactionType + " compaction job '" +
      job.getJobName() + "' to " + job.getQueueName() + " queue.  " +
      "(current delta dirs count=" + curDirNumber +
      ", obsolete delta dirs count=" + obsoleteDirNumber + ". TxnIdRange[" + minTxn + "," + maxTxn + "]");
    RunningJob rj = new JobClient(job).submitJob(job);
    LOG.info("Submitted compaction job '" + job.getJobName() + "' with jobID=" + rj.getID() + " compaction ID=" + id);
    txnHandler.setHadoopJobId(rj.getID().toString(), id);
    rj.waitForCompletion();
    if (!rj.isSuccessful()) {
      throw new IOException(compactionType == CompactionType.MAJOR ? "Major" : "Minor" +
          " compactor job failed for " + jobName + "! Hadoop JobId: " + rj.getID() );
    }
  }
{code}

We should close the JobClient to release resources (cached FS objects etc).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)