You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/12/03 17:13:08 UTC

[GitHub] [hive] pvargacl opened a new pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

pvargacl opened a new pull request #1738:
URL: https://github.com/apache/hive/pull/1738


   
   ### What changes were proposed in this pull request?
   See the details in  HIVE-24481
   
   
   ### Why are the changes needed?
   Fix the data corruption issue.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Unit test
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r540174463



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -1177,6 +1177,104 @@ private HiveStreamingConnection prepareTableTwoPartitionsAndConnection(String db
         .connect();
   }
 
+  /**
+   * There is a special case handled in Compaction Worker that will skip compaction
+   * if there is only one valid delta. But this compaction will be still cleaned up, if there are aborted directories.
+   * @see Worker.isEnoughToCompact
+   * However if no compaction was done, deltas containing mixed aborted / committed writes from streaming can not be cleaned
+   * and the metadata belonging to those aborted transactions can not be removed.
+   * @throws Exception ex
+   */
+  @Test
+  public void testSkippedCompactionCleanerKeepsAborted() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(b STRING) " +
+        " PARTITIONED BY (a INT) STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("alter table " + tblName + " add partition(a=1)", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+
+    // Create initial aborted txn
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(Collections.singletonList("1"))
+        .withAgentInfo(agentInfo)
+        .withHiveConf(conf)
+        .withRecordWriter(writer)
+        .withStreamingOptimizations(true)
+        .withTransactionBatchSize(1)
+        .connect();
+
+    connection.beginTransaction();
+    connection.write("3,1".getBytes());
+    connection.write("4,1".getBytes());
+    connection.abortTransaction();
+
+    connection.close();
+
+    // Create a sequence of commit, abort, commit to the same delta folder
+    connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(Collections.singletonList("1"))
+        .withAgentInfo(agentInfo)
+        .withHiveConf(conf)
+        .withRecordWriter(writer)
+        .withStreamingOptimizations(true)
+        .withTransactionBatchSize(3)
+        .connect();
+
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,1".getBytes());
+    connection.commitTransaction();
+
+    connection.beginTransaction();
+    connection.write("3,1".getBytes());
+    connection.write("4,1".getBytes());
+    connection.abortTransaction();
+
+    connection.beginTransaction();
+    connection.write("5,1".getBytes());
+    connection.write("6,1".getBytes());
+    connection.commitTransaction();
+
+    connection.close();
+
+    // Check that aborted are not read back
+    driver.run("select * from cws");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(4, res.size());
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+    Assert.assertEquals("There should be 2 record for two aborted transaction", 2, count);
+
+    // Start a compaction, that will be skipped, because only one valid delta is there
+    driver.run("alter table cws partition(a='1') compact 'minor'");
+    runWorker(conf);
+    // Cleaner should not delete info about aborted txn 2
+    runCleaner(conf);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+    Assert.assertEquals("There should be 1 record for two aborted transaction", 1, count);

Review comment:
       there should be single record for the 2nd aborted txn




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r540174463



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -1177,6 +1177,104 @@ private HiveStreamingConnection prepareTableTwoPartitionsAndConnection(String db
         .connect();
   }
 
+  /**
+   * There is a special case handled in Compaction Worker that will skip compaction
+   * if there is only one valid delta. But this compaction will be still cleaned up, if there are aborted directories.
+   * @see Worker.isEnoughToCompact
+   * However if no compaction was done, deltas containing mixed aborted / committed writes from streaming can not be cleaned
+   * and the metadata belonging to those aborted transactions can not be removed.
+   * @throws Exception ex
+   */
+  @Test
+  public void testSkippedCompactionCleanerKeepsAborted() throws Exception {
+    String dbName = "default";
+    String tblName = "cws";
+
+    String agentInfo = "UT_" + Thread.currentThread().getName();
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(b STRING) " +
+        " PARTITIONED BY (a INT) STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+    executeStatementOnDriver("alter table " + tblName + " add partition(a=1)", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+        .withFieldDelimiter(',')
+        .build();
+
+    // Create initial aborted txn
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(Collections.singletonList("1"))
+        .withAgentInfo(agentInfo)
+        .withHiveConf(conf)
+        .withRecordWriter(writer)
+        .withStreamingOptimizations(true)
+        .withTransactionBatchSize(1)
+        .connect();
+
+    connection.beginTransaction();
+    connection.write("3,1".getBytes());
+    connection.write("4,1".getBytes());
+    connection.abortTransaction();
+
+    connection.close();
+
+    // Create a sequence of commit, abort, commit to the same delta folder
+    connection = HiveStreamingConnection.newBuilder()
+        .withDatabase(dbName)
+        .withTable(tblName)
+        .withStaticPartitionValues(Collections.singletonList("1"))
+        .withAgentInfo(agentInfo)
+        .withHiveConf(conf)
+        .withRecordWriter(writer)
+        .withStreamingOptimizations(true)
+        .withTransactionBatchSize(3)
+        .connect();
+
+    connection.beginTransaction();
+    connection.write("1,1".getBytes());
+    connection.write("2,1".getBytes());
+    connection.commitTransaction();
+
+    connection.beginTransaction();
+    connection.write("3,1".getBytes());
+    connection.write("4,1".getBytes());
+    connection.abortTransaction();
+
+    connection.beginTransaction();
+    connection.write("5,1".getBytes());
+    connection.write("6,1".getBytes());
+    connection.commitTransaction();
+
+    connection.close();
+
+    // Check that aborted are not read back
+    driver.run("select * from cws");
+    List res = new ArrayList();
+    driver.getFetchTask().fetch(res);
+    Assert.assertEquals(4, res.size());
+
+    int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+    Assert.assertEquals("There should be 2 record for two aborted transaction", 2, count);
+
+    // Start a compaction, that will be skipped, because only one valid delta is there
+    driver.run("alter table cws partition(a='1') compact 'minor'");
+    runWorker(conf);
+    // Cleaner should not delete info about aborted txn 2
+    runCleaner(conf);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
+    count = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+    Assert.assertEquals("There should be 1 record for two aborted transaction", 1, count);

Review comment:
       expectation msg copy-paste - "there should be single record for the 2nd aborted txn"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r537329142



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -566,20 +567,20 @@ else if (filename.startsWith(BUCKET_PREFIX)) {
   public static final class DirectoryImpl implements Directory {
     private final List<Path> abortedDirectories;
     private final Set<Long> abortedWriteIds;
+    private final boolean uncompactedAborts;
     private final boolean isBaseInRawFormat;
     private final List<HdfsFileStatusWithId> original;
     private final List<Path> obsolete;
     private final List<ParsedDelta> deltas;
     private final Path base;
     private List<HdfsFileStatusWithId> baseFiles;
 
-    public DirectoryImpl(List<Path> abortedDirectories, Set<Long> abortedWriteIds,
-        boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
-        List<Path> obsolete, List<ParsedDelta> deltas, Path base) {
-      this.abortedDirectories = abortedDirectories == null ?
-          Collections.emptyList() : abortedDirectories;
-      this.abortedWriteIds = abortedWriteIds == null ?
-        Collections.emptySet() : abortedWriteIds;
+    public DirectoryImpl(List<Path> abortedDirectories, Set<Long> abortedWriteIds, boolean uncompactedAborts,

Review comment:
       Yes, I plan to do that. I think I will make DirectoryImpl mutable, and pass it down the getAcidState calls, to gather all the info in it along the way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r535965368



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -566,20 +567,20 @@ else if (filename.startsWith(BUCKET_PREFIX)) {
   public static final class DirectoryImpl implements Directory {
     private final List<Path> abortedDirectories;
     private final Set<Long> abortedWriteIds;
+    private final boolean uncompactedAborts;
     private final boolean isBaseInRawFormat;
     private final List<HdfsFileStatusWithId> original;
     private final List<Path> obsolete;
     private final List<ParsedDelta> deltas;
     private final Path base;
     private List<HdfsFileStatusWithId> baseFiles;
 
-    public DirectoryImpl(List<Path> abortedDirectories, Set<Long> abortedWriteIds,
-        boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
-        List<Path> obsolete, List<ParsedDelta> deltas, Path base) {
-      this.abortedDirectories = abortedDirectories == null ?
-          Collections.emptyList() : abortedDirectories;
-      this.abortedWriteIds = abortedWriteIds == null ?
-        Collections.emptySet() : abortedWriteIds;
+    public DirectoryImpl(List<Path> abortedDirectories, Set<Long> abortedWriteIds, boolean uncompactedAborts,

Review comment:
       It's starting to affect readability, maybe refactor in the following patches.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
pvargacl commented on pull request #1738:
URL: https://github.com/apache/hive/pull/1738#issuecomment-738658011


   @deniskuzZ @klcopp can I ask for a review


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ merged pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
deniskuzZ merged pull request #1738:
URL: https://github.com/apache/hive/pull/1738


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] deniskuzZ commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
deniskuzZ commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r540190715



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -923,6 +929,13 @@ public String toString() {
      * @return the list of aborted writeIds
      */
     Set<Long> getAbortedWriteIds();
+
+    /**
+     * Get the list of writeIds that belong to aborted transactions, but can not be cleaned,

Review comment:
       fix the javadoc 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org


[GitHub] [hive] pvargacl commented on a change in pull request #1738: HIVE-24481: Skipped compaction can cause data corruption with streaming

Posted by GitBox <gi...@apache.org>.
pvargacl commented on a change in pull request #1738:
URL: https://github.com/apache/hive/pull/1738#discussion_r535939329



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -1369,14 +1383,14 @@ private static Directory getAcidState(FileSystem fileSystem, Path candidateDirec
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         getChildState(child, writeIdList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, abortedWriteIds, fs, validTxnList);
+            bestBase, ignoreEmptyFiles, abortedDirectories, abortedWriteIds, uncompactedAborts, fs, validTxnList);

Review comment:
       In a follow up Jira it might be worth change this whole AcidUtils approach and start to put everything from the beginning in a DirectoryImpl, so the argument count could be decreased to a sane amount.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org