You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kr...@apache.org on 2022/07/22 11:09:39 UTC
[hive] branch master updated: HIVE-26417: Iceberg integration: disable update and merge iceberg table when split update is off (Krisztian Kasa, reviewed by Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 90428cc5f59 HIVE-26417: Iceberg integration: disable update and merge iceberg table when split update is off (Krisztian Kasa, reviewed by Peter Vary)
90428cc5f59 is described below
commit 90428cc5f594bd0abb457e4e5c391007b2ad1cb8
Author: Krisztian Kasa <ka...@gmail.com>
AuthorDate: Fri Jul 22 13:09:27 2022 +0200
HIVE-26417: Iceberg integration: disable update and merge iceberg table when split update is off (Krisztian Kasa, reviewed by Peter Vary)
---
.../java/org/apache/hadoop/hive/ql/ErrorMsg.java | 2 +
.../writer/HiveIcebergBufferedDeleteWriter.java | 179 ------------------
.../mr/hive/writer/HiveIcebergUpdateWriter.java | 91 ---------
.../iceberg/mr/hive/writer/WriterBuilder.java | 14 +-
.../hive/writer/TestHiveIcebergUpdateWriter.java | 138 --------------
.../test/queries/negative/merge_split_update_off.q | 12 ++
.../queries/negative/update_split_update_off.q | 7 +
.../results/negative/merge_split_update_off.q.out | 25 +++
.../results/negative/update_split_update_off.q.out | 13 ++
.../java/org/apache/hive/jdbc/TestJdbcDriver2.java | 1 +
.../hive/ql/parse/MergeSemanticAnalyzer.java | 121 ++++++------
.../hive/ql/parse/RewriteSemanticAnalyzer.java | 67 ++++---
.../hive/ql/parse/SemanticAnalyzerFactory.java | 7 +
.../hive/ql/parse/SplitMergeSemanticAnalyzer.java | 107 +++++++++++
.../hive/ql/parse/SplitUpdateSemanticAnalyzer.java | 204 +++++++++++++++++++++
.../ql/parse/UpdateDeleteSemanticAnalyzer.java | 171 ++---------------
16 files changed, 490 insertions(+), 669 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 8f7887d73a9..5b9279b76fc 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -483,6 +483,8 @@ public enum ErrorMsg {
CBO_IS_REQUIRED(10433,
"The following functionality requires CBO (" + HiveConf.ConfVars.HIVE_CBO_ENABLED.varname + "): {0}", true),
CTLF_UNSUPPORTED_FORMAT(10434, "CREATE TABLE LIKE FILE is not supported by the ''{0}'' file format", true),
+ NON_NATIVE_ACID_UPDATE(10435, "Update and Merge into non-native ACID table is only supported when " +
+ HiveConf.ConfVars.SPLIT_UPDATE.varname + " is true."),
//========================== 20000 range starts here ========================//
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
deleted file mode 100644
index 2d910533559..00000000000
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergBufferedDeleteWriter.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr.hive.writer;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Map;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.InternalRecordWrapper;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.deletes.PositionDelete;
-import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
-import org.apache.iceberg.io.DeleteWriteResult;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.OutputFileFactory;
-import org.apache.iceberg.io.PartitioningWriter;
-import org.apache.iceberg.mr.hive.FilesForCommit;
-import org.apache.iceberg.mr.hive.IcebergAcidUtil;
-import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.iceberg.util.Tasks;
-import org.roaringbitmap.longlong.PeekableLongIterator;
-import org.roaringbitmap.longlong.Roaring64Bitmap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
- * We need to keep the incoming records in memory until they are written out. To keep the memory consumption minimal
- * we only write out {@link PositionDelete} files where the row data is omitted, so only the filenames and the rowIds
- * have to be in the memory.
- */
-class HiveIcebergBufferedDeleteWriter implements HiveIcebergWriter {
- private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergBufferedDeleteWriter.class);
-
- // Storing deleted data in a map Partition -> FileName -> BitMap
- private final Map<PartitionKey, Map<String, Roaring64Bitmap>> buffer = Maps.newHashMap();
- private final Map<Integer, PartitionSpec> specs;
- private final Map<PartitionKey, PartitionSpec> keyToSpec = Maps.newHashMap();
- private final FileFormat format;
- private final FileWriterFactory<Record> writerFactory;
- private final OutputFileFactory fileFactory;
- private final FileIO io;
- private final long targetFileSize;
- private final int poolSize;
- private final Record record;
- private final InternalRecordWrapper wrapper;
- private FilesForCommit filesForCommit;
-
- HiveIcebergBufferedDeleteWriter(Schema schema, Map<Integer, PartitionSpec> specs,
- FileWriterFactory<Record> writerFactory, OutputFileFactory fileFactory, FileFormat format, FileIO io,
- long targetFileSize, int poolSize) {
- this.specs = specs;
- this.format = format;
- this.writerFactory = writerFactory;
- this.fileFactory = fileFactory;
- this.io = io;
- this.targetFileSize = targetFileSize;
- this.poolSize = poolSize;
- this.wrapper = new InternalRecordWrapper(schema.asStruct());
- this.record = GenericRecord.create(schema);
- }
-
- @Override
- public void write(Writable row) throws IOException {
- Record rec = ((Container<Record>) row).get();
- IcebergAcidUtil.populateWithOriginalValues(rec, record);
- String filePath = IcebergAcidUtil.parseFilePath(rec);
- int specId = IcebergAcidUtil.parseSpecId(rec);
-
- Map<String, Roaring64Bitmap> deleteMap =
- buffer.computeIfAbsent(partition(record, specId), key -> {
- keyToSpec.put(key, specs.get(specId));
- return Maps.newHashMap();
- });
- Roaring64Bitmap deletes = deleteMap.computeIfAbsent(filePath, unused -> new Roaring64Bitmap());
- deletes.add(IcebergAcidUtil.parseFilePosition(rec));
- }
-
- @Override
- public void close(boolean abort) throws IOException {
- long startTime = System.currentTimeMillis();
- Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
- if (!abort) {
- LOG.info("Delete file flush is started");
- int size = Math.min(buffer.size(), poolSize);
- ExecutorService fileExecutor = fileExecutor(size);
- try {
- Tasks.foreach(buffer.keySet())
- .retry(3)
- .executeWith(fileExecutor)
- .onFailure((partition, exception) -> LOG.info("Failed to write delete file {}", partition, exception))
- .run(partition -> {
- PositionDelete<Record> positionDelete = PositionDelete.create();
- PartitioningWriter writerForFiles;
- try (PartitioningWriter writer =
- new ClusteredPositionDeleteWriter<>(writerFactory, fileFactory, io, format, targetFileSize)) {
- Map<String, Roaring64Bitmap> deleteRows = buffer.get(partition);
- for (String filePath : new TreeSet<>(deleteRows.keySet())) {
- Roaring64Bitmap deletes = deleteRows.get(filePath);
- PeekableLongIterator longIterator = deletes.getLongIterator();
- while (longIterator.hasNext()) {
- long position = longIterator.next();
- positionDelete.set(filePath, position, null);
- writer.write(positionDelete, keyToSpec.get(partition), partition);
- }
- }
- // We need the writer object later to get the generated data files
- writerForFiles = writer;
- }
- deleteFiles.addAll(((DeleteWriteResult) writerForFiles.result()).deleteFiles());
- }, IOException.class);
- } finally {
- fileExecutor.shutdown();
- }
- }
-
- LOG.info("HiveIcebergBufferedDeleteWriter is closed with abort={}. Created {} delete files and it took {} ns.",
- abort, deleteFiles.size(), System.currentTimeMillis() - startTime);
- LOG.debug("Delete files written {}", deleteFiles);
-
- this.filesForCommit = FilesForCommit.onlyDelete(deleteFiles);
- }
-
- @Override
- public FilesForCommit files() {
- return filesForCommit;
- }
-
- protected PartitionKey partition(Record row, int specId) {
- PartitionKey partitionKey = new PartitionKey(specs.get(specId), specs.get(specId).schema());
- partitionKey.partition(wrapper.wrap(row));
- return partitionKey;
- }
-
- /**
- * Executor service for parallel writing of delete files.
- * @param poolSize The pool size
- * @return The generated executor service
- */
- private static ExecutorService fileExecutor(int poolSize) {
- return Executors.newFixedThreadPool(
- poolSize,
- new ThreadFactoryBuilder()
- .setDaemon(true)
- .setPriority(Thread.NORM_PRIORITY)
- .setNameFormat("iceberg-delete-file-pool-%d")
- .build());
- }
-}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
deleted file mode 100644
index 4f4154854e1..00000000000
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/HiveIcebergUpdateWriter.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr.hive.writer;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DeleteFile;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.FileWriterFactory;
-import org.apache.iceberg.io.OutputFileFactory;
-import org.apache.iceberg.mr.hive.FilesForCommit;
-import org.apache.iceberg.mr.hive.IcebergAcidUtil;
-import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
-
-/**
- * Hive update queries are converted to an insert statement where the result contains the updated rows.
- * The schema is defined by {@link IcebergAcidUtil#createFileReadSchemaForUpdate(List, Table)}}.
- * The rows are sorted based on the requirements of the {@link HiveIcebergRecordWriter}.
- * The {@link HiveIcebergBufferedDeleteWriter} needs to handle out of order records.
- */
-class HiveIcebergUpdateWriter implements HiveIcebergWriter {
-
- private final HiveIcebergBufferedDeleteWriter deleteWriter;
- private final HiveIcebergRecordWriter insertWriter;
- private final Container<Record> container;
-
- HiveIcebergUpdateWriter(Schema schema, Map<Integer, PartitionSpec> specs, int currentSpecId,
- FileWriterFactory<Record> fileWriterFactory, OutputFileFactory fileFactory, OutputFileFactory deleteFileFactory,
- FileFormat format, FileFormat deleteFormat, FileIO io, long targetFileSize, int poolSize) {
- this.deleteWriter = new HiveIcebergBufferedDeleteWriter(schema, specs, fileWriterFactory, deleteFileFactory,
- deleteFormat, io, targetFileSize, poolSize);
- this.insertWriter = new HiveIcebergRecordWriter(schema, specs, currentSpecId, fileWriterFactory, fileFactory,
- format, io, targetFileSize);
- this.container = new Container<>();
- Record record = GenericRecord.create(schema);
- container.set(record);
- }
-
- @Override
- public void write(Writable row) throws IOException {
- deleteWriter.write(row);
- IcebergAcidUtil.populateWithNewValues(((Container<Record>) row).get(), container.get());
- insertWriter.write(container);
- }
-
- @Override
- public void close(boolean abort) throws IOException {
- deleteWriter.close(abort);
- insertWriter.close(abort);
- }
-
- @Override
- public FilesForCommit files() {
- Collection<DataFile> dataFiles = insertWriter.files().dataFiles();
- Collection<DeleteFile> deleteFiles = deleteWriter.files().deleteFiles();
- return new FilesForCommit(dataFiles, deleteFiles);
- }
-
- @VisibleForTesting
- HiveIcebergWriter deleteWriter() {
- return deleteWriter;
- }
-}
diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
index ee0fd9a2f6a..4a80d125384 100644
--- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
+++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/writer/WriterBuilder.java
@@ -109,25 +109,23 @@ public class WriterBuilder {
.operationId("delete-" + operationId)
.build();
- Schema positionDeleteRowSchema = operation == Operation.UPDATE ? null : dataSchema;
HiveFileWriterFactory writerFactory = new HiveFileWriterFactory(table, dataFileFormat, dataSchema, null,
- deleteFileFormat, null, null, null,
- positionDeleteRowSchema);
+ deleteFileFormat, null, null, null, dataSchema);
HiveIcebergWriter writer;
switch (operation) {
- case UPDATE:
- writer = new HiveIcebergUpdateWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
- deleteOutputFileFactory, dataFileFormat, deleteFileFormat, io, targetFileSize, poolSize);
- break;
case DELETE:
writer = new HiveIcebergDeleteWriter(dataSchema, specs, writerFactory, deleteOutputFileFactory,
deleteFileFormat, io, targetFileSize);
break;
- default:
+ case OTHER:
writer = new HiveIcebergRecordWriter(dataSchema, specs, currentSpecId, writerFactory, outputFileFactory,
dataFileFormat, io, targetFileSize);
break;
+ default:
+ // Update and Merge should be splitted to inserts and deletes
+ throw new IllegalArgumentException("Unsupported operation when creating IcebergRecordWriter: " +
+ operation.name());
}
WriterRegistry.registerWriter(attemptID, tableName, writer);
diff --git a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java b/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
deleted file mode 100644
index 1ecf7f85f63..00000000000
--- a/iceberg/iceberg-handler/src/test/java/org/apache/iceberg/mr/hive/writer/TestHiveIcebergUpdateWriter.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr.hive.writer;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.PartitionKey;
-import org.apache.iceberg.RowDelta;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.mr.hive.IcebergAcidUtil;
-import org.apache.iceberg.mr.mapred.Container;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.util.StructLikeSet;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class TestHiveIcebergUpdateWriter extends HiveIcebergWriterTestBase {
- private static final Map<Integer, GenericRecord> UPDATED_RECORDS = ImmutableMap.of(
- 29, record(29, "d"),
- 61, record(61, "h"),
- 89, record(81, "a"),
- 100, record(132, "e"),
- 122, record(142, "k"));
-
- /**
- * This test just runs sends the data through the deleteWriter. Here we make sure that the correct rows are removed.
- * @throws IOException If here is an error
- */
- @Test
- public void testDelete() throws IOException {
- HiveIcebergWriter updateWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
- HiveIcebergWriter deleteWriter = ((HiveIcebergUpdateWriter) updateWriter).deleteWriter();
-
- update(table, deleteWriter);
-
- StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
- StructLikeSet actual = actualRowSet(table);
-
- Assert.assertEquals("Table should contain expected rows", expected, actual);
- }
-
- /**
- * This test uses the UpdateWriter to check that the values are correctly updated.
- * @throws IOException If there is an error
- */
- @Test
- public void testUpdate() throws IOException {
- HiveIcebergWriter testWriter = writerBuilder.poolSize(10).operation(Context.Operation.UPDATE).build();
-
- update(table, testWriter);
-
- StructLikeSet expected = rowSetWithoutIds(RECORDS, UPDATED_RECORDS.keySet());
- expected.addAll(UPDATED_RECORDS.values());
- StructLikeSet actual = actualRowSet(table);
-
- Assert.assertEquals("Table should contain expected rows", expected, actual);
- }
-
- private static void update(Table table, HiveIcebergWriter testWriter) throws IOException {
- List<GenericRecord> updateRecords = updateRecords(table, UPDATED_RECORDS);
-
- Collections.sort(updateRecords, Comparator.comparing(a -> a.getField("data").toString()));
-
- Container<Record> container = new Container<>();
- for (Record deleteRecord : updateRecords) {
- container.set(deleteRecord);
- testWriter.write(container);
- }
-
- testWriter.close(false);
-
- RowDelta rowDelta = table.newRowDelta();
- testWriter.files().deleteFiles().forEach(rowDelta::addDeletes);
- testWriter.files().dataFiles().forEach(rowDelta::addRows);
- rowDelta.commit();
- }
-
- private static List<GenericRecord> updateRecords(Table table, Map<Integer, GenericRecord> updated)
- throws IOException {
- List<GenericRecord> updateRecords = Lists.newArrayListWithExpectedSize(updated.size());
- for (GenericRecord record : readRecords(table, schemaWithMeta(table))) {
- if (!updated.keySet().contains(record.getField("id"))) {
- continue;
- }
-
- GenericRecord updateRecord = GenericRecord.create(IcebergAcidUtil.createSerdeSchemaForUpdate(SCHEMA.columns()));
- int specId = (Integer) record.getField(MetadataColumns.SPEC_ID.name());
- updateRecord.setField(MetadataColumns.SPEC_ID.name(), specId);
- PartitionKey partitionKey = new PartitionKey(table.specs().get(specId), table.schema());
- partitionKey.partition(record);
- updateRecord.setField(MetadataColumns.PARTITION_COLUMN_NAME, partitionKey);
- updateRecord.setField(MetadataColumns.FILE_PATH.name(), record.getField(MetadataColumns.FILE_PATH.name()));
- updateRecord.setField(MetadataColumns.ROW_POSITION.name(), record.getField(MetadataColumns.ROW_POSITION.name()));
-
- SCHEMA.columns().forEach(field -> {
- updateRecord.setField(field.name(), updated.get(record.getField("id")).getField(field.name()));
- updateRecord.setField("__old_value_for_" + field.name(), record.getField(field.name()));
- });
- updateRecords.add(updateRecord);
- }
- return updateRecords;
- }
-
- private static GenericRecord record(Integer id, String data) {
- GenericRecord record = GenericRecord.create(SCHEMA);
- record.setField("id", id);
- record.setField("data", data);
- return record;
- }
-}
diff --git a/iceberg/iceberg-handler/src/test/queries/negative/merge_split_update_off.q b/iceberg/iceberg-handler/src/test/queries/negative/merge_split_update_off.q
new file mode 100644
index 00000000000..f7f64e2c2c9
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/negative/merge_split_update_off.q
@@ -0,0 +1,12 @@
+set hive.split.update=false;
+set hive.merge.split.update=false;
+
+drop table if exists test_merge_target;
+drop table if exists test_merge_source;
+create external table test_merge_target (a int, b string, c int) stored by iceberg stored as orc;
+create external table test_merge_source (a int, b string, c int) stored by iceberg stored as orc;
+
+explain
+merge into test_merge_target as t using test_merge_source src ON t.a = src.a
+when matched then update set b = 'Merged', c = t.c + 10
+when not matched then insert values (src.a, src.b, src.c);
diff --git a/iceberg/iceberg-handler/src/test/queries/negative/update_split_update_off.q b/iceberg/iceberg-handler/src/test/queries/negative/update_split_update_off.q
new file mode 100644
index 00000000000..fa3379c2e71
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/queries/negative/update_split_update_off.q
@@ -0,0 +1,7 @@
+set hive.split.update=false;
+
+drop table if exists test_update;
+create external table test_update (id int, value string) stored by iceberg stored as orc;
+
+explain
+update test_update set value='anything' where id=1;
diff --git a/iceberg/iceberg-handler/src/test/results/negative/merge_split_update_off.q.out b/iceberg/iceberg-handler/src/test/results/negative/merge_split_update_off.q.out
new file mode 100644
index 00000000000..b89353d6610
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/negative/merge_split_update_off.q.out
@@ -0,0 +1,25 @@
+PREHOOK: query: drop table if exists test_merge_target
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists test_merge_target
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: drop table if exists test_merge_source
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists test_merge_source
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table test_merge_target (a int, b string, c int) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_merge_target
+POSTHOOK: query: create external table test_merge_target (a int, b string, c int) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_merge_target
+PREHOOK: query: create external table test_merge_source (a int, b string, c int) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_merge_source
+POSTHOOK: query: create external table test_merge_source (a int, b string, c int) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_merge_source
+FAILED: SemanticException [Error 10435]: Update and Merge into non-native ACID table is only supported when hive.split.update is true.
diff --git a/iceberg/iceberg-handler/src/test/results/negative/update_split_update_off.q.out b/iceberg/iceberg-handler/src/test/results/negative/update_split_update_off.q.out
new file mode 100644
index 00000000000..270d6ee09bf
--- /dev/null
+++ b/iceberg/iceberg-handler/src/test/results/negative/update_split_update_off.q.out
@@ -0,0 +1,13 @@
+PREHOOK: query: drop table if exists test_update
+PREHOOK: type: DROPTABLE
+POSTHOOK: query: drop table if exists test_update
+POSTHOOK: type: DROPTABLE
+PREHOOK: query: create external table test_update (id int, value string) stored by iceberg stored as orc
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@test_update
+POSTHOOK: query: create external table test_update (id int, value string) stored by iceberg stored as orc
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@test_update
+FAILED: SemanticException [Error 10435]: Update and Merge into non-native ACID table is only supported when hive.split.update is true.
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 00c17212e2a..6efcf681bc6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -266,6 +266,7 @@ public class TestJdbcDriver2 {
Statement stmt = con.createStatement();
stmt.execute("set " + ConfVars.SPLIT_UPDATE.varname + "=" + splitUpdateEarly);
+ stmt.execute("set " + ConfVars.MERGE_SPLIT_UPDATE.varname + "=" + splitUpdateEarly);
stmt.execute("set " + ConfVars.HIVE_SUPPORT_CONCURRENCY.varname + "=true");
stmt.execute("set " + ConfVars.HIVE_TXN_MANAGER.varname +
"=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
index b5d58c490a8..577890748ad 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/MergeSemanticAnalyzer.java
@@ -62,11 +62,10 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
@Override
public void analyze(ASTNode tree, Table targetTable, ASTNode tableNameNode) throws SemanticException {
- if (tree.getToken().getType() != HiveParser.TOK_MERGE) {
- throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
- "MergeSemanticAnalyzer");
+ boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(targetTable);
+ if (nonNativeAcid) {
+ throw new SemanticException(ErrorMsg.NON_NATIVE_ACID_UPDATE.getErrorCodedMsg());
}
- ctx.setOperation(Context.Operation.MERGE);
analyzeMerge(tree, targetTable, tableNameNode);
}
@@ -82,7 +81,8 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
*
* @throws SemanticException
*/
- private void analyzeMerge(ASTNode tree, Table targetTable, ASTNode targetNameNode) throws SemanticException {
+ protected void analyzeMerge(ASTNode tree, Table targetTable, ASTNode targetNameNode)
+ throws SemanticException {
/*
* See org.apache.hadoop.hive.ql.parse.TestMergeStatement for some examples of the merge AST
For example, given:
@@ -119,6 +119,12 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
source is empty? This should be a runtime error - maybe not the outer side of ROJ is empty => the join produces 0
rows. If supporting WHEN NOT MATCHED BY SOURCE, then this should be a runtime error
*/
+ if (tree.getToken().getType() != HiveParser.TOK_MERGE) {
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "MergeSemanticAnalyzer");
+ }
+
+ ctx.setOperation(Context.Operation.MERGE);
ASTNode source = (ASTNode)tree.getChild(1);
String targetName = getSimpleTableName(targetNameNode);
String sourceName = getSimpleTableName(source);
@@ -137,7 +143,6 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
StringBuilder rewrittenQueryStr = createRewrittenQueryStrBuilder();
rewrittenQueryStr.append("(SELECT ");
- boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(targetTable);
String subQueryAlias = isAliased(targetNameNode) ? targetName : targetTable.getTTable().getTableName();
ColumnAppender columnAppender = getColumnAppender(subQueryAlias);
columnAppender.appendAcidSelectColumns(rewrittenQueryStr, Context.Operation.UPDATE);
@@ -167,9 +172,6 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
if (hasHint) {
hintStr = " /*+ " + qHint.getText() + " */ ";
}
- final boolean splitUpdateEarly = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPLIT_UPDATE) ||
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.MERGE_SPLIT_UPDATE) ||
- nonNativeAcid;
/**
* We allow at most 2 WHEN MATCHED clause, in which case 1 must be Update the other Delete
* If we have both update and delete, the 1st one (in SQL code) must have "AND <extra predicate>"
@@ -192,8 +194,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
case HiveParser.TOK_UPDATE:
numWhenMatchedUpdateClauses++;
String s = handleUpdate(whenClause, rewrittenQueryStr, targetNameNode,
- onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr,
- splitUpdateEarly, columnAppender);
+ onClauseAsText, targetTable, extraPredicate, hintProcessed ? null : hintStr, columnAppender);
hintProcessed = true;
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s; //i.e. it's the 1st WHEN MATCHED
@@ -202,7 +203,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
case HiveParser.TOK_DELETE:
numWhenMatchedDeleteClauses++;
String s1 = handleDelete(whenClause, rewrittenQueryStr,
- onClauseAsText, extraPredicate, hintProcessed ? null : hintStr, false, columnAppender);
+ onClauseAsText, extraPredicate, hintProcessed ? null : hintStr, columnAppender);
hintProcessed = true;
if (numWhenMatchedUpdateClauses + numWhenMatchedDeleteClauses == 1) {
extraPredicate = s1; //i.e. it's the 1st WHEN MATCHED
@@ -232,33 +233,22 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
rewrittenCtx.setOperation(Context.Operation.MERGE);
//set dest name mapping on new context; 1st child is TOK_FROM
- for (int insClauseIdx = 1, whenClauseIdx = 0;
+ int insClauseIdx = 1;
+ for (int whenClauseIdx = 0;
insClauseIdx < rewrittenTree.getChildCount() - (validating ? 1 : 0/*skip cardinality violation clause*/);
- insClauseIdx++, whenClauseIdx++) {
+ whenClauseIdx++) {
//we've added Insert clauses in order or WHEN items in whenClauses
switch (getWhenClauseOperation(whenClauses.get(whenClauseIdx)).getType()) {
case HiveParser.TOK_INSERT:
rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+ ++insClauseIdx;
break;
case HiveParser.TOK_UPDATE:
- if(!splitUpdateEarly) {
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
- } else {
- /* With 2 branches for the update, the 1st branch is the INSERT part
- and the next one is the DELETE. WriteSet tracking treats 2 concurrent DELETES
- as in conflict so Lost Update is still prevented since the delete event lands in the
- partition/bucket where the original version of the row was so any concurrent update/delete
- of the same row will land in the same partition/bucket.
-
- If the insert part lands in a different partition, it should not conflict with another
- Update of that partition since that update by definition cannot be of the same row.
- If we ever enforce unique constraints we may have to have I+I be in conflict*/
- rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
- rewrittenCtx.addDeleteOfUpdateDestNamePrefix(++insClauseIdx, Context.DestClausePrefix.DELETE);
- }
+ insClauseIdx += addDestNamePrefixOfUpdate(insClauseIdx, rewrittenCtx);
break;
case HiveParser.TOK_DELETE:
rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.DELETE);
+ ++insClauseIdx;
break;
default:
assert false;
@@ -273,6 +263,17 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
updateOutputs(targetTable);
}
+ /**
+ * This sets the destination name prefix for update clause.
+ * @param insClauseIdx index of insert clause in the rewritten multi-insert represents the merge update clause.
+ * @param rewrittenCtx the {@link Context} stores the prefixes
+ * @return the number of prefixes set.
+ */
+ protected int addDestNamePrefixOfUpdate(int insClauseIdx, Context rewrittenCtx) {
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.UPDATE);
+ return 1;
+ }
+
/**
* If there is no WHEN NOT MATCHED THEN INSERT, we don't outer join.
*/
@@ -351,16 +352,13 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* @param deleteExtraPredicate - see notes at caller
*/
private String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr, ASTNode target,
- String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr,
- boolean splitUpdateEarly, ColumnAppender columnAppender)
+ String onClauseAsString, Table targetTable, String deleteExtraPredicate, String hintStr,
+ ColumnAppender columnAppender)
throws SemanticException {
assert whenMatchedUpdateClause.getType() == HiveParser.TOK_MATCHED;
assert getWhenClauseOperation(whenMatchedUpdateClause).getType() == HiveParser.TOK_UPDATE;
String targetName = getSimpleTableName(target);
- List<String> values = new ArrayList<>(targetTable.getCols().size() + (splitUpdateEarly ? 1 : 0));
- if(!splitUpdateEarly) {
- values.add(targetName + ".ROW__ID");
- }
+ List<String> values = new ArrayList<>(targetTable.getCols().size());
ASTNode setClause = (ASTNode)getWhenClauseOperation(whenMatchedUpdateClause).getChild(0);
//columns being updated -> update expressions; "setRCols" (last param) is null because we use actual expressions
@@ -404,9 +402,32 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
addPartitionColsAsValues(targetTable.getPartCols(), targetName, values);
- rewrittenQueryStr.append(" -- update clause").append(splitUpdateEarly ? " (insert part)": "").append("\n");
+ String extraPredicate = handleUpdate(whenMatchedUpdateClause, rewrittenQueryStr, onClauseAsString,
+ deleteExtraPredicate, hintStr, columnAppender, targetName, values);
+
+ setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
+ return extraPredicate;
+ }
+
+ protected String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
+ String onClauseAsString, String deleteExtraPredicate, String hintStr,
+ ColumnAppender columnAppender, String targetName, List<String> values) {
+ values.add(0, targetName + ".ROW__ID");
+
+ rewrittenQueryStr.append(" -- update clause").append("\n");
appendInsertBranch(rewrittenQueryStr, hintStr, values);
+ String extraPredicate = addWhereClauseOfUpdate(
+ rewrittenQueryStr, onClauseAsString, whenMatchedUpdateClause, deleteExtraPredicate);
+
+ appendSortBy(rewrittenQueryStr, Collections.singletonList(targetName + ".ROW__ID "));
+ rewrittenQueryStr.append("\n");
+
+ return extraPredicate;
+ }
+
+ protected String addWhereClauseOfUpdate(StringBuilder rewrittenQueryStr, String onClauseAsString,
+ ASTNode whenMatchedUpdateClause, String deleteExtraPredicate) {
rewrittenQueryStr.append(INDENT).append("WHERE ").append(onClauseAsString);
String extraPredicate = getWhenClausePredicate(whenMatchedUpdateClause);
if (extraPredicate != null) {
@@ -416,25 +437,6 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
if (deleteExtraPredicate != null) {
rewrittenQueryStr.append(" AND NOT(").append(deleteExtraPredicate).append(")");
}
- if(!splitUpdateEarly) {
- appendSortBy(rewrittenQueryStr, Collections.singletonList(targetName + ".ROW__ID "));
- }
- rewrittenQueryStr.append("\n");
-
- setUpAccessControlInfoForUpdate(targetTable, setColsExprs);
- //we don't deal with columns on RHS of SET expression since the whole expr is part of the
- //rewritten SQL statement and is thus handled by SemanticAnalyzer. Nor do we have to
- //figure which cols on RHS are from source and which from target
-
- if(splitUpdateEarly) {
- /**
- * this is part of the WHEN MATCHED UPDATE, so we ignore any 'extra predicate' generated
- * by this call to handleDelete()
- */
- rewrittenQueryStr.append(" -- update clause (delete part)\n");
- handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, onClauseAsString,
- deleteExtraPredicate, hintStr, true, columnAppender);
- }
return extraPredicate;
}
@@ -443,13 +445,10 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
* @param onClauseAsString - because there is no clone() and we need to use in multiple places
* @param updateExtraPredicate - see notes at caller
*/
- private String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr,
+ protected String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr,
String onClauseAsString, String updateExtraPredicate,
- String hintStr, boolean splitUpdateEarly, ColumnAppender columnAppender) {
+ String hintStr, ColumnAppender columnAppender) {
assert whenMatchedDeleteClause.getType() == HiveParser.TOK_MATCHED;
- assert (splitUpdateEarly &&
- getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) ||
- getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
List<String> deleteValues = columnAppender.getDeleteValues(Context.Operation.DELETE);
appendInsertBranch(rewrittenQueryStr, hintStr, deleteValues);
@@ -493,7 +492,7 @@ public class MergeSemanticAnalyzer extends RewriteSemanticAnalyzer {
return whenClauses;
}
- private ASTNode getWhenClauseOperation(ASTNode whenClause) {
+ protected ASTNode getWhenClauseOperation(ASTNode whenClause) {
if (!(whenClause.getType() == HiveParser.TOK_MATCHED || whenClause.getType() == HiveParser.TOK_NOT_MATCHED)) {
throw raiseWrongType("Expected TOK_MATCHED|TOK_NOT_MATCHED", whenClause);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
index d38463a37a3..874756860d6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/RewriteSemanticAnalyzer.java
@@ -50,6 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -544,15 +545,6 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
rewrittenQueryStr.append("\n");
}
- protected void appendDeleteBranch(
- StringBuilder rewrittenQueryStr, String hintStr, String alias, List<String> values) {
- List<String> deleteValues = new ArrayList<>(targetTable.getPartCols().size() + values.size());
- deleteValues.addAll(values);
- addPartitionColsAsValues(targetTable.getPartCols(), alias, deleteValues);
-
- appendInsertBranch(rewrittenQueryStr, hintStr, deleteValues);
- }
-
protected void appendSortBy(StringBuilder rewrittenQueryStr, List<String> keys) {
if (keys.isEmpty()) {
return;
@@ -622,24 +614,35 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
new NativeAcidColumnAppender(targetTable, conf, subQueryAlias);
}
- protected interface ColumnAppender {
- void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation);
- List<String> getDeleteValues(Context.Operation operation);
- List<String> getSortKeys();
- }
-
- protected static class NativeAcidColumnAppender implements ColumnAppender {
+ protected static abstract class ColumnAppender {
+ protected final Table table;
+ protected final HiveConf conf;
+ protected final String subQueryAlias;
- private final Table table;
- private final HiveConf conf;
- private final String subQueryAlias;
-
- public NativeAcidColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
+ protected ColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
this.table = table;
this.conf = conf;
this.subQueryAlias = subQueryAlias;
}
+ public abstract void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation);
+ public abstract List<String> getDeleteValues(Context.Operation operation);
+ public abstract List<String> getSortKeys();
+
+ protected String qualify(String columnName) {
+ if (isBlank(subQueryAlias)) {
+ return columnName;
+ }
+ return String.format("%s.%s", subQueryAlias, columnName);
+ }
+ }
+
+ protected static class NativeAcidColumnAppender extends ColumnAppender {
+
+ public NativeAcidColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
+ super(table, conf, subQueryAlias);
+ }
+
@Override
public void appendAcidSelectColumns(StringBuilder stringBuilder, Context.Operation operation) {
stringBuilder.append("ROW__ID,");
@@ -653,29 +656,23 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
@Override
public List<String> getDeleteValues(Context.Operation operation) {
List<String> deleteValues = new ArrayList<>(1 + table.getPartCols().size());
- deleteValues.add(subQueryAlias + ".ROW__ID");
+ deleteValues.add(qualify("ROW__ID"));
for (FieldSchema fieldSchema : table.getPartCols()) {
- deleteValues.add(subQueryAlias + "." + HiveUtils.unparseIdentifier(fieldSchema.getName(), conf));
+ deleteValues.add(qualify(HiveUtils.unparseIdentifier(fieldSchema.getName(), conf)));
}
return deleteValues;
}
@Override
public List<String> getSortKeys() {
- return singletonList(subQueryAlias + ".ROW__ID ");
+ return singletonList(qualify("ROW__ID"));
}
}
- protected static class NonNativeAcidColumnAppender implements ColumnAppender {
-
- private final Table table;
- private final HiveConf conf;
- private final String subQueryAlias;
+ protected static class NonNativeAcidColumnAppender extends ColumnAppender {
public NonNativeAcidColumnAppender(Table table, HiveConf conf, String subQueryAlias) {
- this.table = table;
- this.conf = conf;
- this.subQueryAlias = subQueryAlias;
+ super(table, conf, subQueryAlias);
}
@Override
@@ -696,7 +693,7 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
List<String> deleteValues = new ArrayList<>(acidSelectColumns.size());
for (FieldSchema fieldSchema : acidSelectColumns) {
String prefixedIdentifier = HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf);
- deleteValues.add(String.format("%s.%s", subQueryAlias, prefixedIdentifier));
+ deleteValues.add(qualify(prefixedIdentifier));
}
return deleteValues;
}
@@ -704,9 +701,7 @@ public abstract class RewriteSemanticAnalyzer extends CalcitePlanner {
@Override
public List<String> getSortKeys() {
return table.getStorageHandler().acidSortColumns(table, Context.Operation.DELETE).stream()
- .map(fieldSchema -> String.format(
- "%s.%s",
- subQueryAlias,
+ .map(fieldSchema -> qualify(
HiveUtils.unparseIdentifier(DELETE_PREFIX + fieldSchema.getName(), this.conf)))
.collect(Collectors.toList());
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
index 2c7e010b7a7..aa6c93ce1c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java
@@ -94,10 +94,17 @@ public final class SemanticAnalyzerFactory {
return new ColumnStatsSemanticAnalyzer(queryState);
case HiveParser.TOK_UPDATE_TABLE:
+ if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE)) {
+ return new SplitUpdateSemanticAnalyzer(queryState);
+ }
case HiveParser.TOK_DELETE_FROM:
return new UpdateDeleteSemanticAnalyzer(queryState);
case HiveParser.TOK_MERGE:
+ if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE) ||
+ HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.MERGE_SPLIT_UPDATE)) {
+ return new SplitMergeSemanticAnalyzer(queryState);
+ }
return new MergeSemanticAnalyzer(queryState);
case HiveParser.TOK_ALTER_SCHEDULED_QUERY:
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitMergeSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitMergeSemanticAnalyzer.java
new file mode 100644
index 00000000000..8efe712f58e
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitMergeSemanticAnalyzer.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+
+/**
+ * A subclass of the {@link MergeSemanticAnalyzer} that just handles
+ * merge statements. This version of rewrite adds two insert branches for the update clause one for
+ * inserting new values of updated records and one for inserting the deleted delta records of updated records.
+ */
+public class SplitMergeSemanticAnalyzer extends MergeSemanticAnalyzer {
+
+ SplitMergeSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ protected int addDestNamePrefixOfUpdate(int insClauseIdx, Context rewrittenCtx) {
+ rewrittenCtx.addDestNamePrefix(insClauseIdx, Context.DestClausePrefix.INSERT);
+ rewrittenCtx.addDeleteOfUpdateDestNamePrefix(insClauseIdx + 1, Context.DestClausePrefix.DELETE);
+ return 2;
+ }
+
+ @Override
+ public void analyze(ASTNode tree, Table targetTable, ASTNode tableNameNode) throws SemanticException {
+ analyzeMerge(tree, targetTable, tableNameNode);
+ }
+
+ @Override
+ protected String handleUpdate(ASTNode whenMatchedUpdateClause, StringBuilder rewrittenQueryStr,
+ String onClauseAsString, String deleteExtraPredicate, String hintStr,
+ ColumnAppender columnAppender, String targetName, List<String> values) {
+ rewrittenQueryStr.append(" -- update clause (insert part)\n");
+ appendInsertBranch(rewrittenQueryStr, hintStr, values);
+
+ String extraPredicate = addWhereClauseOfUpdate(
+ rewrittenQueryStr, onClauseAsString, whenMatchedUpdateClause, deleteExtraPredicate);
+
+ rewrittenQueryStr.append("\n");
+
+ rewrittenQueryStr.append(" -- update clause (delete part)\n");
+ handleDelete(whenMatchedUpdateClause, rewrittenQueryStr, onClauseAsString,
+ deleteExtraPredicate, hintStr, columnAppender);
+
+ return extraPredicate;
+ }
+
+ /**
+ * @param onClauseAsString - because there is no clone() and we need to use in multiple places
+ * @param updateExtraPredicate - see notes at caller
+ */
+ @Override
+ protected String handleDelete(ASTNode whenMatchedDeleteClause, StringBuilder rewrittenQueryStr,
+ String onClauseAsString, String updateExtraPredicate,
+ String hintStr, ColumnAppender columnAppender) {
+ assert (
+ getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_UPDATE) ||
+ getWhenClauseOperation(whenMatchedDeleteClause).getType() == HiveParser.TOK_DELETE;
+
+ return super.handleDelete(
+ whenMatchedDeleteClause, rewrittenQueryStr, onClauseAsString, updateExtraPredicate, hintStr, columnAppender);
+ }
+
+ @Override
+ protected boolean allowOutputMultipleTimes() {
+ return true;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitUpdateSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitUpdateSemanticAnalyzer.java
new file mode 100644
index 00000000000..757fd529120
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SplitUpdateSemanticAnalyzer.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A subclass of the {@link SemanticAnalyzer} that just handles
+ * update statements. It works by rewriting the updates into multi-insert
+ * statements (since they are actually inserts).
+ * One insert branch for inserting the new values of the updated records.
+ * And another insert branch for inserting delete delta records of the updated records.
+ *
+ * From
+ * UPDATE acidtlb SET b=350
+ * WHERE a = 30
+ *
+ * To
+ * FROM
+ * (SELECT ROW__ID,`a` AS `a`,350 AS `b` FROM `default`.`acidtlb` WHERE a = 30) s
+ * INSERT INTO `default`.`acidtlb` -- insert delta
+ * SELECT s.`a`,s.`b`
+ * INSERT INTO `default`.`acidtlb` -- delete delta
+ * SELECT s.ROW__ID
+ * SORT BY s.ROW__ID
+ */
+public class SplitUpdateSemanticAnalyzer extends RewriteSemanticAnalyzer {
+
+ private Context.Operation operation = Context.Operation.OTHER;
+
+ SplitUpdateSemanticAnalyzer(QueryState queryState) throws SemanticException {
+ super(queryState);
+ }
+
+ @Override
+ protected ASTNode getTargetTableNode(ASTNode tree) {
+ // The first child should be the table we are updating / deleting from
+ ASTNode tabName = (ASTNode)tree.getChild(0);
+ assert tabName.getToken().getType() == HiveParser.TOK_TABNAME :
+ "Expected tablename as first child of " + operation + " but found " + tabName.getName();
+ return tabName;
+ }
+
+ protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws SemanticException {
+ switch (tree.getToken().getType()) {
+ case HiveParser.TOK_UPDATE_TABLE:
+ analyzeUpdate(tree, table, tabNameNode);
+ break;
+ default:
+ throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
+ "SplitUpdateSemanticAnalyzer");
+ }
+ }
+
+ private void analyzeUpdate(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
+ operation = Context.Operation.UPDATE;
+
+ List<? extends Node> children = tree.getChildren();
+
+ ASTNode where = null;
+ int whereIndex = 2;
+ if (children.size() > whereIndex) {
+ where = (ASTNode) children.get(whereIndex);
+ assert where.getToken().getType() == HiveParser.TOK_WHERE :
+ "Expected where clause, but found " + where.getName();
+ }
+
+ Set<String> setRCols = new LinkedHashSet<>();
+// TOK_UPDATE_TABLE
+// TOK_TABNAME
+// ...
+// TOK_SET_COLUMNS_CLAUSE <- The set list from update should be the second child (index 1)
+ assert children.size() >= 2 : "Expected update token to have at least two children";
+ ASTNode setClause = (ASTNode) children.get(1);
+ Map<String, ASTNode> setCols = collectSetColumnsAndExpressions(setClause, setRCols, mTable);
+ Map<Integer, ASTNode> setColExprs = new HashMap<>(setClause.getChildCount());
+
+ List<FieldSchema> nonPartCols = mTable.getCols();
+ Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(mTable);
+ StringBuilder rewrittenQueryStr = createRewrittenQueryStrBuilder();
+ rewrittenQueryStr.append("(SELECT ");
+
+ ColumnAppender columnAppender = getColumnAppender(SUB_QUERY_ALIAS);
+ columnAppender.appendAcidSelectColumns(rewrittenQueryStr, operation);
+ List<String> deleteValues = columnAppender.getDeleteValues(operation);
+ int columnOffset = deleteValues.size();
+
+ List<String> insertValues = new ArrayList<>(mTable.getCols().size());
+ boolean first = true;
+
+ for (int i = 0; i < nonPartCols.size(); i++) {
+ if (first) {
+ first = false;
+ } else {
+ rewrittenQueryStr.append(",");
+ }
+
+ String name = nonPartCols.get(i).getName();
+ ASTNode setCol = setCols.get(name);
+ String identifier = HiveUtils.unparseIdentifier(name, this.conf);
+
+ if (setCol != null) {
+ if (setCol.getType() == HiveParser.TOK_TABLE_OR_COL &&
+ setCol.getChildCount() == 1 && setCol.getChild(0).getType() == HiveParser.TOK_DEFAULT_VALUE) {
+ rewrittenQueryStr.append(colNameToDefaultConstraint.get(name));
+ } else {
+ rewrittenQueryStr.append(identifier);
+ // This is one of the columns we're setting, record it's position so we can come back
+ // later and patch it up. 0th is ROW_ID
+ setColExprs.put(i + columnOffset, setCol);
+ }
+ } else {
+ rewrittenQueryStr.append(identifier);
+ }
+ rewrittenQueryStr.append(" AS ");
+ rewrittenQueryStr.append(identifier);
+
+ insertValues.add(SUB_QUERY_ALIAS + "." + identifier);
+ }
+ addPartitionColsAsValues(mTable.getPartCols(), SUB_QUERY_ALIAS, insertValues);
+ rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") ");
+ rewrittenQueryStr.append(SUB_QUERY_ALIAS).append("\n");
+
+ appendInsertBranch(rewrittenQueryStr, null, insertValues);
+ appendInsertBranch(rewrittenQueryStr, null, deleteValues);
+
+ List<String> sortKeys = columnAppender.getSortKeys();
+ appendSortBy(rewrittenQueryStr, sortKeys);
+
+ ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
+ Context rewrittenCtx = rr.rewrittenCtx;
+ ASTNode rewrittenTree = rr.rewrittenTree;
+
+ ASTNode rewrittenInsert = new ASTSearcher().simpleBreadthFirstSearch(
+ rewrittenTree, HiveParser.TOK_FROM, HiveParser.TOK_SUBQUERY, HiveParser.TOK_INSERT);
+
+ rewrittenCtx.setOperation(Context.Operation.UPDATE);
+ rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.INSERT);
+ rewrittenCtx.addDeleteOfUpdateDestNamePrefix(2, Context.DestClausePrefix.DELETE);
+
+ if (where != null) {
+ rewrittenInsert.addChild(where);
+ }
+
+ patchProjectionForUpdate(rewrittenInsert, setColExprs);
+
+ // Note: this will overwrite this.ctx with rewrittenCtx
+ rewrittenCtx.setEnableUnparse(false);
+ analyzeRewrittenTree(rewrittenTree, rewrittenCtx);
+
+ updateOutputs(mTable);
+
+ setUpAccessControlInfoForUpdate(mTable, setCols);
+
+ // Add the setRCols to the input list
+ if (columnAccessInfo == null) { //assuming this means we are not doing Auth
+ return;
+ }
+
+ for (String colName : setRCols) {
+ columnAccessInfo.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
+ }
+ }
+
+ @Override
+ protected boolean allowOutputMultipleTimes() {
+ return true;
+ }
+
+ @Override
+ protected boolean enableColumnStatsCollecting() {
+ return false;
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index 8bc219e7649..20fd2fe2036 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.parse;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
@@ -26,9 +25,9 @@ import java.util.Set;
import java.util.stream.Collectors;
-import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.lib.Node;
@@ -61,10 +60,16 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
protected void analyze(ASTNode tree, Table table, ASTNode tabNameNode) throws SemanticException {
switch (tree.getToken().getType()) {
case HiveParser.TOK_DELETE_FROM:
- analyzeDelete(tree, table, tabNameNode);
+ operation = Context.Operation.DELETE;
+ reparseAndSuperAnalyze(tree, table, tabNameNode);
break;
case HiveParser.TOK_UPDATE_TABLE:
- analyzeUpdate(tree, table, tabNameNode);
+ boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(table);
+ if (nonNativeAcid) {
+ throw new SemanticException(ErrorMsg.NON_NATIVE_ACID_UPDATE.getErrorCodedMsg());
+ }
+ operation = Context.Operation.UPDATE;
+ reparseAndSuperAnalyze(tree, table, tabNameNode);
break;
default:
throw new RuntimeException("Asked to parse token " + tree.getName() + " in " +
@@ -72,20 +77,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
}
- private void analyzeUpdate(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
- operation = Context.Operation.UPDATE;
- if (HiveConf.getBoolVar(queryState.getConf(), HiveConf.ConfVars.SPLIT_UPDATE)) {
- analyzeSplitUpdate(tree, mTable, tabNameNode);
- } else {
- reparseAndSuperAnalyze(tree, mTable, tabNameNode);
- }
- }
-
- private void analyzeDelete(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
- operation = Context.Operation.DELETE;
- reparseAndSuperAnalyze(tree, mTable, tabNameNode);
- }
-
/**
* This supports update and delete statements
* Rewrite the delete or update into an insert. Crazy, but it works as deletes and update
@@ -113,19 +104,11 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
rewrittenQueryStr.append(getFullTableNameForSQL(tabNameNode));
addPartitionColsToInsert(mTable.getPartCols(), rewrittenQueryStr);
- boolean nonNativeAcid = AcidUtils.isNonNativeAcidTable(mTable);
- int columnOffset;
- if (nonNativeAcid) {
- List<FieldSchema> acidColumns = mTable.getStorageHandler().acidSelectColumns(mTable, operation);
- String selectCols = acidColumns.stream()
- .map(fieldSchema -> HiveUtils.unparseIdentifier(fieldSchema.getName(), this.conf))
- .collect(Collectors.joining(","));
- rewrittenQueryStr.append(" select ").append(selectCols);
- columnOffset = acidColumns.size();
- } else {
- rewrittenQueryStr.append(" select ROW__ID");
- columnOffset = 1;
- }
+ ColumnAppender columnAppender = getColumnAppender(null);
+ int columnOffset = columnAppender.getDeleteValues(operation).size();
+ rewrittenQueryStr.append(" select ");
+ columnAppender.appendAcidSelectColumns(rewrittenQueryStr, operation);
+ rewrittenQueryStr.setLength(rewrittenQueryStr.length() - 1);
Map<Integer, ASTNode> setColExprs = null;
Map<String, ASTNode> setCols = null;
@@ -155,7 +138,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
}
- addColsToSelect(mTable.getPartCols(), rewrittenQueryStr);
rewrittenQueryStr.append(" from ");
rewrittenQueryStr.append(getFullTableNameForSQL(tabNameNode));
@@ -168,15 +150,7 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
// Add a sort by clause so that the row ids come out in the correct order
- if (nonNativeAcid) {
- List<FieldSchema> sortColumns = mTable.getStorageHandler().acidSortColumns(mTable, operation);
- if (!sortColumns.isEmpty()) {
- String sortCols = sortColumns.stream().map(FieldSchema::getName).collect(Collectors.joining(","));
- rewrittenQueryStr.append(" sort by ").append(sortCols).append(" ");
- }
- } else {
- rewrittenQueryStr.append(" sort by ROW__ID ");
- }
+ appendSortBy(rewrittenQueryStr, columnAppender.getSortKeys());
ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
Context rewrittenCtx = rr.rewrittenCtx;
@@ -249,116 +223,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
}
}
- private void analyzeSplitUpdate(ASTNode tree, Table mTable, ASTNode tabNameNode) throws SemanticException {
- operation = Context.Operation.UPDATE;
-
- List<? extends Node> children = tree.getChildren();
-
- ASTNode where = null;
- int whereIndex = 2;
- if (children.size() > whereIndex) {
- where = (ASTNode) children.get(whereIndex);
- assert where.getToken().getType() == HiveParser.TOK_WHERE :
- "Expected where clause, but found " + where.getName();
- }
-
- Set<String> setRCols = new LinkedHashSet<>();
-// TOK_UPDATE_TABLE
-// TOK_TABNAME
-// ...
-// TOK_SET_COLUMNS_CLAUSE <- The set list from update should be the second child (index 1)
- assert children.size() >= 2 : "Expected update token to have at least two children";
- ASTNode setClause = (ASTNode) children.get(1);
- Map<String, ASTNode> setCols = collectSetColumnsAndExpressions(setClause, setRCols, mTable);
- Map<Integer, ASTNode> setColExprs = new HashMap<>(setClause.getChildCount());
-
- List<FieldSchema> nonPartCols = mTable.getCols();
- Map<String, String> colNameToDefaultConstraint = getColNameToDefaultValueMap(mTable);
- StringBuilder rewrittenQueryStr = createRewrittenQueryStrBuilder();
- rewrittenQueryStr.append("(SELECT ");
-
- ColumnAppender columnAppender = getColumnAppender(SUB_QUERY_ALIAS);
- columnAppender.appendAcidSelectColumns(rewrittenQueryStr, operation);
- List<String> deleteValues = columnAppender.getDeleteValues(operation);
- int columnOffset = deleteValues.size();
-
- List<String> insertValues = new ArrayList<>(mTable.getCols().size());
- boolean first = true;
-
- for (int i = 0; i < nonPartCols.size(); i++) {
- if (first) {
- first = false;
- } else {
- rewrittenQueryStr.append(",");
- }
-
- String name = nonPartCols.get(i).getName();
- ASTNode setCol = setCols.get(name);
- String identifier = HiveUtils.unparseIdentifier(name, this.conf);
-
- if (setCol != null) {
- if (setCol.getType() == HiveParser.TOK_TABLE_OR_COL &&
- setCol.getChildCount() == 1 && setCol.getChild(0).getType() == HiveParser.TOK_DEFAULT_VALUE) {
- rewrittenQueryStr.append(colNameToDefaultConstraint.get(name));
- } else {
- rewrittenQueryStr.append(identifier);
- // This is one of the columns we're setting, record it's position so we can come back
- // later and patch it up. 0th is ROW_ID
- setColExprs.put(i + columnOffset, setCol);
- }
- } else {
- rewrittenQueryStr.append(identifier);
- }
- rewrittenQueryStr.append(" AS ");
- rewrittenQueryStr.append(identifier);
-
- insertValues.add(SUB_QUERY_ALIAS + "." + identifier);
- }
- addPartitionColsAsValues(mTable.getPartCols(), SUB_QUERY_ALIAS, insertValues);
- rewrittenQueryStr.append(" FROM ").append(getFullTableNameForSQL(tabNameNode)).append(") ");
- rewrittenQueryStr.append(SUB_QUERY_ALIAS).append("\n");
-
- appendInsertBranch(rewrittenQueryStr, null, insertValues);
- appendInsertBranch(rewrittenQueryStr, null, deleteValues);
-
- List<String> sortKeys = columnAppender.getSortKeys();
- appendSortBy(rewrittenQueryStr, sortKeys);
-
- ReparseResult rr = parseRewrittenQuery(rewrittenQueryStr, ctx.getCmd());
- Context rewrittenCtx = rr.rewrittenCtx;
- ASTNode rewrittenTree = rr.rewrittenTree;
-
- ASTNode rewrittenInsert = new ASTSearcher().simpleBreadthFirstSearch(
- rewrittenTree, HiveParser.TOK_FROM, HiveParser.TOK_SUBQUERY, HiveParser.TOK_INSERT);
-
- rewrittenCtx.setOperation(Context.Operation.UPDATE);
- rewrittenCtx.addDestNamePrefix(1, Context.DestClausePrefix.INSERT);
- rewrittenCtx.addDeleteOfUpdateDestNamePrefix(2, Context.DestClausePrefix.DELETE);
-
- if (where != null) {
- rewrittenInsert.addChild(where);
- }
-
- patchProjectionForUpdate(rewrittenInsert, setColExprs);
-
- // Note: this will overwrite this.ctx with rewrittenCtx
- rewrittenCtx.setEnableUnparse(false);
- analyzeRewrittenTree(rewrittenTree, rewrittenCtx);
-
- updateOutputs(mTable);
-
- setUpAccessControlInfoForUpdate(mTable, setCols);
-
- // Add the setRCols to the input list
- if (columnAccessInfo == null) { //assuming this means we are not doing Auth
- return;
- }
-
- for (String colName : setRCols) {
- columnAccessInfo.add(Table.getCompleteName(mTable.getDbName(), mTable.getTableName()), colName);
- }
- }
-
private boolean updating() {
return operation == Context.Operation.UPDATE;
}
@@ -366,11 +230,6 @@ public class UpdateDeleteSemanticAnalyzer extends RewriteSemanticAnalyzer {
return operation == Context.Operation.DELETE;
}
- @Override
- protected boolean allowOutputMultipleTimes() {
- return conf.getBoolVar(HiveConf.ConfVars.SPLIT_UPDATE);
- }
-
@Override
protected boolean enableColumnStatsCollecting() {
return false;