You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/04/14 16:55:04 UTC
[hive] branch master updated: HIVE-23058: Compaction task reattempt
fails with FileAlreadyExistsException (Riju Trivedi,
reviewed by Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2857cd6 HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
2857cd6 is described below
commit 2857cd6a896fb2e98ff2a2c44e00ebde274b4b10
Author: Riju Trivedi <rt...@cloudera.com>
AuthorDate: Tue Apr 14 18:54:17 2020 +0200
HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
---
.../hive/ql/txn/compactor/TestCompactor.java | 61 ++++++++++++++++++----
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 13 ++++-
2 files changed, 64 insertions(+), 10 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 95fa664..2d03936 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -26,15 +26,7 @@ import static org.junit.Assert.assertNotNull;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -1306,6 +1298,57 @@ public class TestCompactor {
}
@Test
+ public void testCompactionForFileInSratchDir() throws Exception {
+ String dbName = "default";
+ String tblName = "cfs";
+ String columnNamesProperty = "a,b";
+ String columnTypesProperty = "int:string";
+ String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " + "STORED AS ORC TBLPROPERTIES ('transactional'='true',"
+ + "'transactional_properties'='default')";
+ executeStatementOnDriver("drop table if exists " + tblName, driver);
+ executeStatementOnDriver(createQuery, driver);
+
+
+
+ // Insert some data -> this will generate only insert deltas
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
+
+ // Insert some data -> this will again generate only insert deltas
+ executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2, 'bar')", driver);
+
+ // Find the location of the table
+ IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+ Table table = msClient.getTable(dbName, tblName);
+ FileSystem fs = FileSystem.get(conf);
+
+ Map<String, String> tblProperties = new HashMap<>();
+ tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation() + "/" + "_tmp");
+
+ //Create empty file in ScratchDir under table location
+ String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp";
+ Path dir = new Path(scratchDirPath + "/base_0000002_v0000005");
+ fs.mkdirs(dir);
+ Path emptyFile = AcidUtils.createBucketFile(dir, 0);
+ fs.create(emptyFile);
+
+ //Run MajorCompaction
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ Worker t = new Worker();
+ t.setThreadId((int) t.getId());
+ t.setConf(conf);
+ t.init(new AtomicBoolean(true), new AtomicBoolean());
+ CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+ Cr.setProperties(tblProperties);
+ txnHandler.compact(Cr);
+ t.run();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompacts().size());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+
+ }
+
+ @Test
public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
minorCompactWhileStreamingWithSplitUpdate(false);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 543ec0b..f26d4bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -839,7 +839,18 @@ public class CompactorMR {
AcidOutputFormat<WritableComparable, V> aof =
instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
- writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
+ Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+ cleanupTmpLocationOnTaskRetry(options, rootDir);
+ writer = aof.getRawRecordWriter(rootDir, options);
+ }
+ }
+
+ private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options options, Path rootDir) throws IOException {
+ Path tmpLocation = AcidUtils.createFilename(rootDir, options);
+ FileSystem fs = tmpLocation.getFileSystem(jobConf);
+
+ if (fs.exists(tmpLocation)) {
+ fs.delete(tmpLocation, true);
}
}