You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/09/07 08:10:03 UTC

[GitHub] [hudi] zhangyue19921010 commented on a diff in pull request #5091: [HUDI-3453] Fix HoodieBackedTableMetadata concurrent reading issue

zhangyue19921010 commented on code in PR #5091:
URL: https://github.com/apache/hudi/pull/5091#discussion_r964524079


##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -231,7 +232,7 @@ public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> getRecord
         throw new HoodieIOException("Error merging records from metadata table for  " + sortedKeys.size() + " key : ", ioe);
       } finally {
         if (!reuse) {
-          close(Pair.of(partitionFileSlicePair.getLeft(), partitionFileSlicePair.getRight().getFileId()));

Review Comment:
   Aha, It is also used in 
   ```
     private void closePartitionReaders() {
       for (Pair<String, String> partitionFileSlicePair : partitionReaders.keySet()) {
         close(partitionFileSlicePair);
       }
       partitionReaders.clear();
     }
   ```
   
   



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java:
##########
@@ -94,6 +103,49 @@ public void testTableOperations(boolean reuseReaders) throws Exception {
     verifyBaseMetadataTable(reuseReaders);
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testMultiReaderForHoodieBackedTableMetadata(boolean reuse) throws Exception {
+    final int taskNumber = 100;
+    HoodieTableType tableType = HoodieTableType.COPY_ON_WRITE;
+    init(tableType);
+    testTable.doWriteOperation("000001", INSERT, emptyList(), asList("p1"), 1);
+    HoodieBackedTableMetadata tableMetadata = new HoodieBackedTableMetadata(context, writeConfig.getMetadataConfig(), writeConfig.getBasePath(), writeConfig.getSpillableMapBasePath(), reuse);
+    assertTrue(tableMetadata.enabled());
+    List<String> metadataPartitions = tableMetadata.getAllPartitionPaths();
+    String partition = metadataPartitions.get(0);
+    String finalPartition = basePath + "/" + partition;
+    ArrayList<String> duplicatedPartitions = new ArrayList<>(taskNumber);
+    for (int i = 0; i < taskNumber; i++) {
+      duplicatedPartitions.add(finalPartition);
+    }
+    ExecutorService executors = Executors.newFixedThreadPool(taskNumber);
+    AtomicBoolean flag = new AtomicBoolean(false);
+    CountDownLatch downLatch = new CountDownLatch(taskNumber);
+    AtomicInteger filesNumber = new AtomicInteger(0);
+
+    for (String part : duplicatedPartitions) {
+      executors.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            downLatch.countDown();
+            downLatch.await();
+            FileStatus[] files = tableMetadata.getAllFilesInPartition(new Path(part));
+            filesNumber.addAndGet(files.length);
+            assertEquals(1, files.length);
+          } catch (Exception e) {
+            flag.set(true);
+          }
+        }
+      });
+    }
+    executors.shutdown();
+    executors.awaitTermination(24, TimeUnit.HOURS);

Review Comment:
   Changed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org