You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/06/29 05:08:29 UTC
[hive] branch master updated: HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b3f13b5a86 HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)
b3f13b5a86 is described below
commit b3f13b5a8635893f0b8eaf6e2e92a96ce375f9f0
Author: Krisztian Kasa <kk...@cloudera.com>
AuthorDate: Wed Jun 29 07:08:18 2022 +0200
HIVE-26319: Iceberg integration: Perform update split early (Krisztian Kasa, reviewed by Peter Vary)
---
.../mr/hive/HiveIcebergOutputCommitter.java | 196 +++++++++++++++------
.../iceberg/mr/hive/HiveIcebergStorageHandler.java | 81 +++++----
.../iceberg/mr/hive/writer/WriterRegistry.java | 15 +-
.../positive/update_iceberg_partitioned_avro.q.out | 12 +-
.../positive/update_iceberg_partitioned_orc.q.out | 12 +-
.../update_iceberg_partitioned_parquet.q.out | 12 +-
.../update_iceberg_unpartitioned_parquet.q.out | 12 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 13 +-
.../hive/ql/parse/RewriteSemanticAnalyzer.java | 3 +
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 90 +++++++---
.../hadoop/hive/ql/session/SessionStateUtil.java | 28 ++-
11 files changed, 331 insertions(+), 143 deletions(-)
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index 52507cf65c..02ec4a1a91 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -24,7 +24,10 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -67,6 +70,7 @@ import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
@@ -109,8 +113,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptID attemptID = context.getTaskAttemptID();
JobConf jobConf = context.getJobConf();
- Collection<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
- Map<String, HiveIcebergWriter> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
+ Set<String> outputs = HiveIcebergStorageHandler.outputTables(context.getJobConf());
+ Map<String, List<HiveIcebergWriter>> writers = Optional.ofNullable(WriterRegistry.writers(attemptID))
.orElseGet(() -> {
LOG.info("CommitTask found no writers for output tables: {}, attemptID: {}", outputs, attemptID);
return ImmutableMap.of();
@@ -127,11 +131,12 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.run(output -> {
Table table = HiveIcebergStorageHandler.table(context.getJobConf(), output);
if (table != null) {
- HiveIcebergWriter writer = writers.get(output);
String fileForCommitLocation = generateFileForCommitLocation(table.location(), jobConf,
attemptID.getJobID(), attemptID.getTaskID().getId());
- if (writer != null) {
- createFileForCommit(writer.files(), fileForCommitLocation, table.io());
+ if (writers.get(output) != null) {
+ for (HiveIcebergWriter writer : writers.get(output)) {
+ createFileForCommit(writer.files(), fileForCommitLocation, table.io());
+ }
} else {
LOG.info("CommitTask found no writer for specific table: {}, attemptID: {}", output, attemptID);
createFileForCommit(FilesForCommit.empty(), fileForCommitLocation, table.io());
@@ -162,35 +167,87 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
TaskAttemptContext context = TezUtil.enrichContextWithAttemptWrapper(originalContext);
// Clean up writer data from the local store
- Map<String, HiveIcebergWriter> writers = WriterRegistry.removeWriters(context.getTaskAttemptID());
+ Map<String, List<HiveIcebergWriter>> writerMap = WriterRegistry.removeWriters(context.getTaskAttemptID());
// Remove files if it was not done already
- if (writers != null) {
- for (HiveIcebergWriter writer : writers.values()) {
- writer.close(true);
+ if (writerMap != null) {
+ for (List<HiveIcebergWriter> writerList : writerMap.values()) {
+ for (HiveIcebergWriter writer : writerList) {
+ writer.close(true);
+ }
}
}
}
+ @Override
+ public void commitJob(JobContext originalContext) throws IOException {
+ commitJobs(Collections.singletonList(originalContext));
+ }
+
+ /**
+ * Wrapper class for storing output {@link Table} and it's context for committing changes:
+ * JobContext, CommitInfo.
+ */
+ private static class OutputTable {
+ private final String catalogName;
+ private final String tableName;
+ private final Table table;
+ private final JobContext jobContext;
+ private final SessionStateUtil.CommitInfo commitInfo;
+
+ private OutputTable(String catalogName, String tableName, Table table, JobContext jobContext,
+ SessionStateUtil.CommitInfo commitInfo) {
+ this.catalogName = catalogName;
+ this.tableName = tableName;
+ this.table = table;
+ this.jobContext = jobContext;
+ this.commitInfo = commitInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OutputTable output1 = (OutputTable) o;
+ return Objects.equals(tableName, output1.tableName) &&
+ Objects.equals(jobContext.getJobID(), output1.jobContext.getJobID());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(tableName, jobContext.getJobID());
+ }
+
+ public Optional<SessionStateUtil.CommitInfo> getCommitInfo() {
+ return Optional.ofNullable(commitInfo);
+ }
+ }
+
/**
* Reads the commit files stored in the temp directories and collects the generated committed data files.
* Appends the data files to the tables. At the end removes the temporary directories.
- * @param originalContext The job context
+ * @param originalContextList The job context list
* @throws IOException if there is a failure accessing the files
*/
- @Override
- public void commitJob(JobContext originalContext) throws IOException {
- JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
- JobConf jobConf = jobContext.getJobConf();
-
+ public void commitJobs(List<JobContext> originalContextList) throws IOException {
+ List<JobContext> jobContextList = originalContextList.stream()
+ .map(TezUtil::enrichContextWithVertexId)
+ .collect(Collectors.toList());
+ Set<OutputTable> outputs = collectOutputs(jobContextList);
long startTime = System.currentTimeMillis();
- LOG.info("Committing job {} has started", jobContext.getJobID());
- Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+ String ids = jobContextList.stream()
+ .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(","));
+ LOG.info("Committing job(s) {} has started", ids);
+
Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
- ExecutorService fileExecutor = fileExecutor(jobConf);
- ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf());
+ ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
try {
// Commits the changes for the output tables in parallel
Tasks.foreach(outputs)
@@ -198,17 +255,10 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.stopOnFailure()
.executeWith(tableExecutor)
.run(output -> {
- Table table = SessionStateUtil.getResource(jobConf, output)
- .filter(o -> o instanceof Table).map(o -> (Table) o)
- // fall back to getting the serialized table from the config
- .orElseGet(() -> HiveIcebergStorageHandler.table(jobConf, output));
- if (table != null) {
- String catalogName = HiveIcebergStorageHandler.catalogName(jobConf, output);
- jobLocations.add(generateJobLocation(table.location(), jobConf, jobContext.getJobID()));
- commitTable(table.io(), fileExecutor, jobContext, output, table.location(), catalogName);
- } else {
- LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", output);
- }
+ JobConf jobConf = output.jobContext.getJobConf();
+ Table table = output.table;
+ jobLocations.add(generateJobLocation(table.location(), jobConf, output.jobContext.getJobID()));
+ commitTable(table.io(), fileExecutor, output);
});
} finally {
fileExecutor.shutdown();
@@ -217,9 +267,37 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
}
}
- LOG.info("Commit took {} ms for job {}", System.currentTimeMillis() - startTime, jobContext.getJobID());
+ LOG.info("Commit took {} ms for job(s) {}", System.currentTimeMillis() - startTime, ids);
- cleanup(jobContext, jobLocations);
+ for (JobContext jobContext : jobContextList) {
+ cleanup(jobContext, jobLocations);
+ }
+ }
+
+ private Set<OutputTable> collectOutputs(List<JobContext> jobContextList) {
+ Set<OutputTable> outputs = Sets.newHashSet();
+ for (JobContext jobContext : jobContextList) {
+ Set<String> outputNames = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
+ for (String output : outputNames) {
+ Table table = SessionStateUtil.getResource(jobContext.getJobConf(), output)
+ .filter(o -> o instanceof Table).map(o -> (Table) o)
+ // fall back to getting the serialized table from the config
+ .orElseGet(() -> HiveIcebergStorageHandler.table(jobContext.getJobConf(), output));
+ if (table == null) {
+ LOG.info("CommitJob found no table object in QueryState or conf for: {}. Skipping job commit.", output);
+ continue;
+ }
+
+ SessionStateUtil.CommitInfo commitInfo = null;
+ if (SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output).isPresent()) {
+ commitInfo = SessionStateUtil.getCommitInfo(jobContext.getJobConf(), output)
+ .get().get(jobContext.getJobID().toString());
+ }
+ String catalogName = HiveIcebergStorageHandler.catalogName(jobContext.getJobConf(), output);
+ outputs.add(new OutputTable(catalogName, output, table, jobContext, commitInfo));
+ }
+ }
+ return outputs;
}
/**
@@ -231,15 +309,23 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
*/
@Override
public void abortJob(JobContext originalContext, int status) throws IOException {
- JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
- JobConf jobConf = jobContext.getJobConf();
+ abortJobs(Collections.singletonList(originalContext));
+ }
+
+ public void abortJobs(List<JobContext> originalContextList) throws IOException {
+ List<JobContext> jobContextList = originalContextList.stream()
+ .map(TezUtil::enrichContextWithVertexId)
+ .collect(Collectors.toList());
+ Set<OutputTable> outputs = collectOutputs(jobContextList);
+
+ String ids = jobContextList.stream()
+ .map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(","));
+ LOG.info("Job(s) {} are aborted. Data file cleaning started", ids);
- LOG.info("Job {} is aborted. Data file cleaning started", jobContext.getJobID());
- Collection<String> outputs = HiveIcebergStorageHandler.outputTables(jobContext.getJobConf());
Collection<String> jobLocations = new ConcurrentLinkedQueue<>();
- ExecutorService fileExecutor = fileExecutor(jobConf);
- ExecutorService tableExecutor = tableExecutor(jobConf, outputs.size());
+ ExecutorService fileExecutor = fileExecutor(jobContextList.get(0).getJobConf());
+ ExecutorService tableExecutor = tableExecutor(jobContextList.get(0).getJobConf(), outputs.size());
try {
// Cleans up the changes for the output tables in parallel
Tasks.foreach(outputs)
@@ -247,9 +333,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
.executeWith(tableExecutor)
.onFailure((output, exc) -> LOG.warn("Failed cleanup table {} on abort job", output, exc))
.run(output -> {
+ JobContext jobContext = output.jobContext;
+ JobConf jobConf = jobContext.getJobConf();
LOG.info("Cleaning job for jobID: {}, table: {}", jobContext.getJobID(), output);
- Table table = HiveIcebergStorageHandler.table(jobConf, output);
+ Table table = output.table;
String jobLocation = generateJobLocation(table.location(), jobConf, jobContext.getJobID());
jobLocations.add(jobLocation);
// list jobLocation to get number of forCommit files
@@ -277,9 +365,11 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
}
}
- LOG.info("Job {} is aborted. Data file cleaning finished", jobContext.getJobID());
+ LOG.info("Job(s) {} are aborted. Data file cleaning finished", ids);
- cleanup(jobContext, jobLocations);
+ for (JobContext jobContext : jobContextList) {
+ cleanup(jobContext, jobLocations);
+ }
}
/**
@@ -305,27 +395,26 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
* Collects the additions to a single table and adds/commits the new files to the Iceberg table.
* @param io The io to read the forCommit files
* @param executor The executor used to read the forCommit files
- * @param jobContext The job context
- * @param name The name of the table used for loading from the catalog
- * @param location The location of the table used for loading from the catalog
- * @param catalogName The name of the catalog that contains the table
+ * @param outputTable The table used for loading from the catalog
*/
- private void commitTable(FileIO io, ExecutorService executor, JobContext jobContext, String name, String location,
- String catalogName) {
+ private void commitTable(FileIO io, ExecutorService executor, OutputTable outputTable) {
+ String name = outputTable.tableName;
+ JobContext jobContext = outputTable.jobContext;
JobConf conf = jobContext.getJobConf();
Properties catalogProperties = new Properties();
catalogProperties.put(Catalogs.NAME, name);
- catalogProperties.put(Catalogs.LOCATION, location);
- if (catalogName != null) {
- catalogProperties.put(InputFormatConfig.CATALOG_NAME, catalogName);
+ catalogProperties.put(Catalogs.LOCATION, outputTable.table.location());
+ if (outputTable.catalogName != null) {
+ catalogProperties.put(InputFormatConfig.CATALOG_NAME, outputTable.catalogName);
}
Table table = Catalogs.loadTable(conf, catalogProperties);
long startTime = System.currentTimeMillis();
LOG.info("Committing job has started for table: {}, using location: {}",
- table, generateJobLocation(location, conf, jobContext.getJobID()));
+ table, generateJobLocation(outputTable.table.location(), conf, jobContext.getJobID()));
- int numTasks = SessionStateUtil.getCommitInfo(conf, name).map(info -> info.getTaskNum()).orElseGet(() -> {
+ Optional<SessionStateUtil.CommitInfo> commitInfo = outputTable.getCommitInfo();
+ int numTasks = commitInfo.map(SessionStateUtil.CommitInfo::getTaskNum).orElseGet(() -> {
// Fallback logic, if number of tasks are not available in the config
// If there are reducers, then every reducer will generate a result file.
// If this is a map only task, then every mapper will generate a result file.
@@ -334,7 +423,8 @@ public class HiveIcebergOutputCommitter extends OutputCommitter {
return conf.getNumReduceTasks() > 0 ? conf.getNumReduceTasks() : conf.getNumMapTasks();
});
- FilesForCommit writeResults = collectResults(numTasks, executor, location, jobContext, io, true);
+ FilesForCommit writeResults = collectResults(
+ numTasks, executor, outputTable.table.location(), jobContext, io, true);
if (!conf.getBoolean(InputFormatConfig.IS_OVERWRITE, false)) {
if (writeResults.isEmpty()) {
LOG.info(
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index dcec3468ad..5f1c9158aa 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -24,11 +24,13 @@ import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -77,8 +79,6 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobContextImpl;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.OutputCommitter;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.FileFormat;
@@ -100,6 +100,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Throwables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SerializationUtil;
import org.slf4j.Logger;
@@ -411,24 +412,28 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
public void storageHandlerCommit(Properties commitProperties, boolean overwrite) throws HiveException {
String tableName = commitProperties.getProperty(Catalogs.NAME);
Configuration configuration = SessionState.getSessionConf();
- Optional<JobContext> jobContext = generateJobContext(configuration, tableName, overwrite);
- if (jobContext.isPresent()) {
- OutputCommitter committer = new HiveIcebergOutputCommitter();
+ List<JobContext> jobContextList = generateJobContext(configuration, tableName, overwrite);
+ if (jobContextList.isEmpty()) {
+ return;
+ }
+
+ HiveIcebergOutputCommitter committer = new HiveIcebergOutputCommitter();
+ try {
+ committer.commitJobs(jobContextList);
+ } catch (Throwable e) {
+ String ids = jobContextList
+ .stream().map(jobContext -> jobContext.getJobID().toString()).collect(Collectors.joining(", "));
+ // Aborting the job if the commit has failed
+ LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
+ ids, tableName, e);
try {
- committer.commitJob(jobContext.get());
- } catch (Throwable e) {
- // Aborting the job if the commit has failed
- LOG.error("Error while trying to commit job: {}, starting rollback changes for table: {}",
- jobContext.get().getJobID(), tableName, e);
- try {
- committer.abortJob(jobContext.get(), JobStatus.State.FAILED);
- } catch (IOException ioe) {
- LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
- // no throwing here because the original exception should be propagated
- }
- throw new HiveException(
- "Error committing job: " + jobContext.get().getJobID() + " for table: " + tableName, e);
+ committer.abortJobs(jobContextList);
+ } catch (IOException ioe) {
+ LOG.error("Error while trying to abort failed job. There might be uncleaned data files.", ioe);
+ // no throwing here because the original exception should be propagated
}
+ throw new HiveException(
+ "Error committing job: " + ids + " for table: " + tableName, e);
}
}
@@ -653,8 +658,8 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
* @param config The configuration used to get the data from
* @return The collection of the table names as returned by TableDesc.getTableName()
*/
- public static Collection<String> outputTables(Configuration config) {
- return TABLE_NAME_SPLITTER.splitToList(config.get(InputFormatConfig.OUTPUT_TABLES));
+ public static Set<String> outputTables(Configuration config) {
+ return Sets.newHashSet(TABLE_NAME_SPLITTER.split(config.get(InputFormatConfig.OUTPUT_TABLES)));
}
/**
@@ -859,29 +864,35 @@ public class HiveIcebergStorageHandler implements HiveStoragePredicateHandler, H
}
/**
- * Generates a JobContext for the OutputCommitter for the specific table.
+ * Generates {@link JobContext}s for the OutputCommitter for the specific table.
* @param configuration The configuration used for as a base of the JobConf
* @param tableName The name of the table we are planning to commit
* @param overwrite If we have to overwrite the existing table or just add the new data
- * @return The generated JobContext
+ * @return The generated Optional JobContext list or empty if not presents.
*/
- private Optional<JobContext> generateJobContext(Configuration configuration, String tableName, boolean overwrite) {
+ private List<JobContext> generateJobContext(Configuration configuration, String tableName,
+ boolean overwrite) {
JobConf jobConf = new JobConf(configuration);
- Optional<SessionStateUtil.CommitInfo> commitInfo = SessionStateUtil.getCommitInfo(jobConf, tableName);
- if (commitInfo.isPresent()) {
- JobID jobID = JobID.forName(commitInfo.get().getJobIdStr());
- commitInfo.get().getProps().forEach(jobConf::set);
- jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
-
- // we should only commit this current table because
- // for multi-table inserts, this hook method will be called sequentially for each target table
- jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
-
- return Optional.of(new JobContextImpl(jobConf, jobID, null));
+ Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
+ SessionStateUtil.getCommitInfo(jobConf, tableName);
+ if (commitInfoMap.isPresent()) {
+ List<JobContext> jobContextList = Lists.newLinkedList();
+ for (SessionStateUtil.CommitInfo commitInfo : commitInfoMap.get().values()) {
+ JobID jobID = JobID.forName(commitInfo.getJobIdStr());
+ commitInfo.getProps().forEach(jobConf::set);
+ jobConf.setBoolean(InputFormatConfig.IS_OVERWRITE, overwrite);
+
+ // we should only commit this current table because
+ // for multi-table inserts, this hook method will be called sequentially for each target table
+ jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+
+ jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+ }
+ return jobContextList;
} else {
// most likely empty write scenario
LOG.debug("Unable to find commit information in query state for table: {}", tableName);
- return Optional.empty();
+ return Collections.emptyList();
}
}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
index dba4973e41..4d7708758f 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterRegistry.java
@@ -19,26 +19,33 @@
package org.apache.iceberg.mr.hive.writer;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
public class WriterRegistry {
- private static final Map<TaskAttemptID, Map<String, HiveIcebergWriter>> writers = Maps.newConcurrentMap();
+ private static final Map<TaskAttemptID, Map<String, List<HiveIcebergWriter>>> writers = Maps.newConcurrentMap();
private WriterRegistry() {
}
- public static Map<String, HiveIcebergWriter> removeWriters(TaskAttemptID taskAttemptID) {
+ public static Map<String, List<HiveIcebergWriter>> removeWriters(TaskAttemptID taskAttemptID) {
return writers.remove(taskAttemptID);
}
public static void registerWriter(TaskAttemptID taskAttemptID, String tableName, HiveIcebergWriter writer) {
writers.putIfAbsent(taskAttemptID, Maps.newConcurrentMap());
- writers.get(taskAttemptID).put(tableName, writer);
+
+ Map<String, List<HiveIcebergWriter>> writersOfTableMap = writers.get(taskAttemptID);
+ writersOfTableMap.putIfAbsent(tableName, Lists.newArrayList());
+
+ List<HiveIcebergWriter> writerList = writersOfTableMap.get(tableName);
+ writerList.add(writer);
}
- public static Map<String, HiveIcebergWriter> writers(TaskAttemptID taskAttemptID) {
+ public static Map<String, List<HiveIcebergWriter>> writers(TaskAttemptID taskAttemptID) {
return writers.get(taskAttemptID);
}
}
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
index 351011cb47..6782a0771a 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_avro.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a =
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changes' where b in ('one', 'four') or a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
index 4fbe5c0500..c669b7dba6 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_orc.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
index 46913856ed..317e2b51e2 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_partitioned_parquet.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[55][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[60][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
index 1b0d9d5bcd..c1f4775c22 100644
--- a/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
+++ b/iceberg/iceberg-handler/src/test/results/positive/update_iceberg_unpartitioned_parquet.q.out
@@ -22,10 +22,12 @@ PREHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a =
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed' where b in ('one', 'four') or a = 22
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -49,16 +51,18 @@ POSTHOOK: query: insert into tbl_ice values (444, 'hola', 800), (555, 'schola',
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@tbl_ice
-Warning: Shuffle Join MERGEJOIN[51][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
-Warning: Shuffle Join MERGEJOIN[53][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
+Warning: Shuffle Join MERGEJOIN[56][tables = [$hdt$_0, $hdt$_1]] in Stage 'Reducer 2' is a cross product
+Warning: Shuffle Join MERGEJOIN[58][tables = [$hdt$_0, $hdt$_1, $hdt$_2, $hdt$_3]] in Stage 'Reducer 4' is a cross product
PREHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed again' where a in (select a from tbl_ice where a <= 5) or c in (select c from tbl_ice where c > 800)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -101,11 +105,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_ice_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='Changed forever' where a in (select t1.a from tbl_ice t1 join tbl_ice_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_ice_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
@@ -150,11 +156,13 @@ PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
PREHOOK: Input: default@tbl_standard_other
PREHOOK: Output: default@tbl_ice
+PREHOOK: Output: default@tbl_ice
POSTHOOK: query: update tbl_ice set b='The last one' where a in (select t1.a from tbl_ice t1 join tbl_standard_other t2 on t1.a = t2.a)
POSTHOOK: type: QUERY
POSTHOOK: Input: default@tbl_ice
POSTHOOK: Input: default@tbl_standard_other
POSTHOOK: Output: default@tbl_ice
+POSTHOOK: Output: default@tbl_ice
PREHOOK: query: select * from tbl_ice order by a, b, c
PREHOOK: type: QUERY
PREHOOK: Input: default@tbl_ice
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 0d95b62e29..c56c37dd86 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -617,18 +617,13 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
initializeSpecPath();
fs = specPath.getFileSystem(hconf);
- setWriteOperation(hconf, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
- if (hconf instanceof JobConf) {
- jc = (JobConf) hconf;
- } else {
- // test code path
- jc = new JobConf(hconf);
- }
+ jc = new JobConf(hconf);
+ setWriteOperation(jc, getConf().getTableInfo().getTableName(), getConf().getWriteOperation());
try {
createHiveOutputFormat(jc);
} catch (HiveException ex) {
- logOutputFormatError(hconf, ex);
+ logOutputFormatError(jc, ex);
throw ex;
}
isCompressed = conf.getCompressed();
@@ -639,7 +634,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
}
statsFromRecordWriter = new boolean[numFiles];
AbstractSerDe serde = conf.getTableInfo().getSerDeClass().newInstance();
- serde.initialize(unsetNestedColumnPaths(hconf), conf.getTableInfo().getProperties(), null);
+ serde.initialize(unsetNestedColumnPaths(jc), conf.getTableInfo().getProperties(), null);
serializer = serde;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index cd7de19b12..f934990b13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -555,6 +555,9 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
}
protected void appendSortBy(StringBuilder rewrittenQueryStr, List<String> keys) {
+ if (keys.isEmpty()) {
+ return;
+ }
rewrittenQueryStr.append(INDENT).append("SORT BY ");
rewrittenQueryStr.append(StringUtils.join(keys, ","));
rewrittenQueryStr.append("\n");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index f83f36d71a..deca32eb42 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.parse;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -30,13 +29,13 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveUtils;
import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.session.SessionStateUtil;
+
+import static java.util.Collections.singletonList;
/**
* A subclass of the {@link org.apache.hadoop.hive.ql.parse.SemanticAnalyzer} that just handles
@@ -79,7 +78,7 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
operation = Context.Operation.UPDATE;
boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
- if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE) && !nonNativeAcid) {
+ if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE)) {
analyzeSplitUpdate(tree, mTable, tabNameNode);
} else {
reparseAndSuperAnalyze(tree, mTable, tabNameNode);
@@ -113,9 +112,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
private void reparseAndSuperAnalyze(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
List<? extends Node> children = tree.getChildren();
- // save the operation type into the query state
- SessionStateUtil.addResource(conf, Context.Operation.class.getSimpleName(), operation.name());
-
StringBuilder rewrittenQueryStr = new StringBuilder();
rewrittenQueryStr.append("insert into table ");
rewrittenQueryStr.append(getFullTableNameForSQL(tabNameNode));
@@ -257,6 +253,9 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
}
+ public static final String DELETE_PREFIX = "__d__";
+ public static final String SUB_QUERY_ALIAS = "s";
+
private void analyzeSplitUpdate(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
operation = Context.Operation.UPDATE;
@@ -282,11 +281,45 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
List<FieldSchema> nonPartCols = mTable.getCols();
Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(mTable);
- List<String> values = new ArrayList<>(mTable.getCols().size());
StringBuilder rewrittenQueryStr = createRewrittenQueryStrBuilder();
- rewrittenQueryStr.append("(SELECT ROW__ID");
+ rewrittenQueryStr.append("(SELECT ");
+
+ boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
+ int columnOffset;
+ List<String> deleteValues;
+ if (nonNativeAcid) {
+ List<FieldSchema> acidSelectColumns = mTable.getStorageHandler().acidSelectColumns(mTable, operation);
+ deleteValues = new ArrayList<>(acidSelectColumns.size());
+ for (FieldSchema fieldSchema : acidSelectColumns) {
+ String identifier = HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf);
+ rewrittenQueryStr.append(identifier).append(" AS ");
+ String prefixedIdentifier = HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf);
+ rewrittenQueryStr.append(prefixedIdentifier);
+ rewrittenQueryStr.append(",");
+ deleteValues.add(String.format("%s.%s", SUB_QUERY_ALIAS, prefixedIdentifier));
+ }
+
+ columnOffset = acidSelectColumns.size();
+ } else {
+ rewrittenQueryStr.append("ROW__ID,");
+ deleteValues = new ArrayList<>(1 + mTable.getPartCols().size());
+ deleteValues.add(SUB_QUERY_ALIAS + ".ROW__ID");
+ for (FieldSchema fieldSchema : mTable.getPartCols()) {
+ deleteValues.add(SUB_QUERY_ALIAS + "." + HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
+ }
+ columnOffset = 1;
+ }
+
+ List<String> insertValues = new ArrayList<>(mTable.getCols().size());
+ boolean first = true;
+
for (int i = 0; i < nonPartCols.size(); i++) {
- rewrittenQueryStr.append(',');
+ if (first) {
+ first = false;
+ } else {
+ rewrittenQueryStr.append(",");
+ }
+
String name = nonPartCols.get(i).getName();
ASTNode setCol = setCols.get(name);
String identifier = HiveUtils.unparseIdentifier(name, this.conf);
@@ -299,7 +332,7 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
rewrittenQueryStr.append(identifier);
// This is one of the columns we're setting, record it's position so we can come back
// later and patch it up. 0th is ROW_ID
- setColExprs.put(i + 1, setCol);
+ setColExprs.put(i + columnOffset, setCol);
}
} else {
rewrittenQueryStr.append(identifier);
@@ -307,16 +340,28 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
rewrittenQueryStr.append(" AS ");
rewrittenQueryStr.append(identifier);
- values.add("s." + identifier);
+ insertValues.add(SUB_QUERY_ALIAS + "." + identifier);
}
addPartitionColsToSelect(mTable.getPartCols(), rewrittenQueryStr);
- addPartitionColsAsValues(mTable.getPartCols(), "s", values);
- rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") s\n");
+ addPartitionColsAsValues(mTable.getPartCols(), SUB_QUERY_ALIAS, insertValues);
+ rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") ");
+ rewrittenQueryStr.append(SUB_QUERY_ALIAS).append("\n");
- appendInsertBranch(rewrittenQueryStr, null, values);
- appendDeleteBranch(rewrittenQueryStr, null, "s", Collections.singletonList("s.ROW__ID "));
+ appendInsertBranch(rewrittenQueryStr, null, insertValues);
+ appendInsertBranch(rewrittenQueryStr, null, deleteValues);
- appendSortBy(rewrittenQueryStr, Collections.singletonList("s.ROW__ID "));
+ List<String> sortKeys;
+ if (nonNativeAcid) {
+ sortKeys = mTable.getStorageHandler().acidSortColumns(mTable, Context.Operation.DELETE).stream()
+ .map(fieldSchema -> String.format(
+ "%s.%s",
+ SUB_QUERY_ALIAS,
+ HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf)))
+ .collect(Collectors.toList());
+ } else {
+ sortKeys = singletonList(SUB_QUERY_ALIAS + ".ROW__ID ");
+ }
+ appendSortBy(rewrittenQueryStr, sortKeys);
ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
Context rewrittenCtx = rr.rewrittenCtx;
@@ -335,14 +380,9 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
patchProjectionForUpdate(rewrittenInsert, setColExprs);
- try {
- useSuper = true;
- // Note: this will overwrite this.ctx with rewrittenCtx
- rewrittenCtx.setEnableUnparse(false);
- super.analyze(rewrittenTree, rewrittenCtx);
- } finally {
- useSuper = false;
- }
+ // Note: this will overwrite this.ctx with rewrittenCtx
+ rewrittenCtx.setEnableUnparse(false);
+ analyzeRewrittenTree(rewrittenTree, rewrittenCtx);
updateOutputs(mTable);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
index 2cfe9d1e79..c400ace122 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionStateUtil.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.session;
+import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@@ -74,12 +75,10 @@ public class SessionStateUtil {
/**
* @param conf Configuration object used for getting the query state, should contain the query id
* @param tableName Name of the table for which the commit info should be retrieved
- * @return the CommitInfo, or empty Optional if not present
+ * @return the CommitInfo map. Key: jobId, Value: {@link CommitInfo}, or empty Optional if not present
*/
- public static Optional<CommitInfo> getCommitInfo(Configuration conf, String tableName) {
- return getResource(conf, COMMIT_INFO_PREFIX + tableName)
- .filter(o -> o instanceof CommitInfo)
- .map(o -> (CommitInfo) o);
+ public static Optional<Map<String, CommitInfo>> getCommitInfo(Configuration conf, String tableName) {
+ return getResource(conf, COMMIT_INFO_PREFIX + tableName).map(o -> (Map<String, CommitInfo>)o);
}
/**
@@ -92,11 +91,22 @@ public class SessionStateUtil {
*/
public static boolean addCommitInfo(Configuration conf, String tableName, String jobId, int taskNum,
Map<String, String> additionalProps) {
+
CommitInfo commitInfo = new CommitInfo()
- .withJobID(jobId)
- .withTaskNum(taskNum)
- .withProps(additionalProps);
- return addResource(conf, COMMIT_INFO_PREFIX + tableName, commitInfo);
+ .withJobID(jobId)
+ .withTaskNum(taskNum)
+ .withProps(additionalProps);
+
+ Optional<Map<String, CommitInfo>> commitInfoMap = getCommitInfo(conf, tableName);
+ if (commitInfoMap.isPresent()) {
+ commitInfoMap.get().put(jobId, commitInfo);
+ return true;
+ }
+
+ Map<String, CommitInfo> newCommitInfoMap = new HashMap<>();
+ newCommitInfoMap.put(jobId, commitInfo);
+
+ return addResource(conf, COMMIT_INFO_PREFIX + tableName, newCommitInfoMap);
}
private static Optional<QueryState> getQueryState(Configuration conf) {