You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/04/14 16:55:04 UTC

[hive] branch master updated: HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2857cd6  HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
2857cd6 is described below

commit 2857cd6a896fb2e98ff2a2c44e00ebde274b4b10
Author: Riju Trivedi <rt...@cloudera.com>
AuthorDate: Tue Apr 14 18:54:17 2020 +0200

    HIVE-23058: Compaction task reattempt fails with FileAlreadyExistsException (Riju Trivedi, reviewed by Laszlo Pinter)
---
 .../hive/ql/txn/compactor/TestCompactor.java       | 61 ++++++++++++++++++----
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 13 ++++-
 2 files changed, 64 insertions(+), 10 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index 95fa664..2d03936 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -26,15 +26,7 @@ import static org.junit.Assert.assertNotNull;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -1306,6 +1298,57 @@ public class TestCompactor {
   }
 
   @Test
+  public void testCompactionForFileInSratchDir() throws Exception {
+    String dbName = "default";
+    String tblName = "cfs";
+    String columnNamesProperty = "a,b";
+    String columnTypesProperty = "int:string";
+    String createQuery = "CREATE TABLE " + tblName + "(a INT, b STRING) " + "STORED AS ORC  TBLPROPERTIES ('transactional'='true',"
+            + "'transactional_properties'='default')";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver(createQuery, driver);
+
+
+
+    // Insert some data -> this will generate only insert deltas
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(1, 'foo')", driver);
+
+    // Insert some data -> this will again generate only insert deltas
+    executeStatementOnDriver("INSERT INTO " + tblName + "(a,b) VALUES(2, 'bar')", driver);
+
+    // Find the location of the table
+    IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
+    Table table = msClient.getTable(dbName, tblName);
+    FileSystem fs = FileSystem.get(conf);
+
+    Map<String, String> tblProperties = new HashMap<>();
+    tblProperties.put("compactor.hive.compactor.input.tmp.dir",table.getSd().getLocation() + "/" + "_tmp");
+
+    //Create empty file in ScratchDir under table location
+    String scratchDirPath = table.getSd().getLocation() + "/" + "_tmp";
+    Path dir = new Path(scratchDirPath + "/base_0000002_v0000005");
+    fs.mkdirs(dir);
+    Path emptyFile = AcidUtils.createBucketFile(dir, 0);
+    fs.create(emptyFile);
+
+    //Run MajorCompaction
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    Worker t = new Worker();
+    t.setThreadId((int) t.getId());
+    t.setConf(conf);
+    t.init(new AtomicBoolean(true), new AtomicBoolean());
+    CompactionRequest Cr = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
+    Cr.setProperties(tblProperties);
+    txnHandler.compact(Cr);
+    t.run();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompacts().size());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+
+  }
+
+  @Test
   public void minorCompactWhileStreamingWithSplitUpdate() throws Exception {
     minorCompactWhileStreamingWithSplitUpdate(false);
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 543ec0b..f26d4bb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -839,7 +839,18 @@ public class CompactorMR {
         AcidOutputFormat<WritableComparable, V> aof =
             instantiate(AcidOutputFormat.class, jobConf.get(OUTPUT_FORMAT_CLASS_NAME));
 
-        writer = aof.getRawRecordWriter(new Path(jobConf.get(TMP_LOCATION)), options);
+        Path rootDir = new Path(jobConf.get(TMP_LOCATION));
+        cleanupTmpLocationOnTaskRetry(options, rootDir);
+        writer = aof.getRawRecordWriter(rootDir, options);
+      }
+   }
+
+    private void cleanupTmpLocationOnTaskRetry(AcidOutputFormat.Options options, Path rootDir) throws IOException {
+      Path tmpLocation = AcidUtils.createFilename(rootDir, options);
+      FileSystem fs = tmpLocation.getFileSystem(jobConf);
+
+      if (fs.exists(tmpLocation)) {
+        fs.delete(tmpLocation, true);
       }
     }