You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/01/13 23:58:43 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #2441: [HUDI 1308] [WIP] rfc15 perf prod testing

vinothchandar commented on a change in pull request #2441:
URL: https://github.com/apache/hudi/pull/2441#discussion_r556945779



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
##########
@@ -192,7 +196,8 @@ public CleanPlanner(HoodieTable<T, I, K, O> hoodieTable, HoodieWriteConfig confi
    */
   private List<String> getPartitionPathsForFullCleaning() throws IOException {
     // Go to brute force mode of scanning all partitions
-    return hoodieTable.metadata().getAllPartitionPaths();
+    return FSUtils.getAllPartitionPaths(context, hoodieTable.getMetaClient().getFs(), config.getBasePath(),

Review comment:
       this should also take `HoodieMetadataConfig` as an arg

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java
##########
@@ -198,14 +200,20 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException
     // If metadata table is enabled, do not archive instants which are more recent that the latest synced
     // instant on the metadata table. This is required for metadata table sync.
     if (config.useFileListingMetadata()) {
-      Option<String> lastSyncedInstantTime = table.metadata().getSyncedInstantTime();
-      if (lastSyncedInstantTime.isPresent()) {
-        LOG.info("Limiting archiving of instants to last synced instant on metadata table at " + lastSyncedInstantTime.get());
-        instants = instants.filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(), HoodieTimeline.LESSER_THAN,
-            lastSyncedInstantTime.get()));
-      } else {
-        LOG.info("Not archiving as there is no instants yet on the metadata table");
-        instants = Stream.empty();
+      try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(table.getContext(), config.getBasePath(), "/tmp/",

Review comment:
       do we just pass `/tmp` always? . `HoodieTableMetadata#create()` should just take `HoodieMetadataConfig` as arg

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java
##########
@@ -192,42 +195,58 @@ private static RemoteHoodieTableFileSystemView createRemoteFileSystemView(Serial
         metaClient, viewConf.getRemoteTimelineClientTimeoutSecs());
   }
 
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config) {
+    return createViewManager(context, metadataConfig, config, (SerializableSupplier<HoodieTableMetadata>) null);
+  }
+
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config,
+                                                        final String basePath) {
+    return createViewManager(context, metadataConfig, config,
+        () -> HoodieTableMetadata.create(context, basePath, FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR,
+            metadataConfig.useFileListingMetadata(), metadataConfig.getFileListingMetadataVerify(),
+        false, metadataConfig.shouldAssumeDatePartitioning()));
+  }
+
   /**
    * Main Factory method for building file-system views.
-   * 
-   * @param conf Hadoop Configuration
-   * @param config View Storage Configuration
-   * @return
+   *
    */
-  public static FileSystemViewManager createViewManager(final SerializableConfiguration conf,
-      final FileSystemViewStorageConfig config) {
+  public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
+                                                        final HoodieMetadataConfig metadataConfig,
+                                                        final FileSystemViewStorageConfig config,
+                                                        final SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
     LOG.info("Creating View Manager with storage type :" + config.getStorageType());
+    final SerializableConfiguration conf = context.getHadoopConf();
     switch (config.getStorageType()) {
       case EMBEDDED_KV_STORE:
         LOG.info("Creating embedded rocks-db based Table View");
-        return new FileSystemViewManager(conf, config,
+        return new FileSystemViewManager(context, config,
             (metaClient, viewConf) -> createRocksDBBasedFileSystemView(conf, viewConf, metaClient));
       case SPILLABLE_DISK:
         LOG.info("Creating Spillable Disk based Table View");
-        return new FileSystemViewManager(conf, config,
+        return new FileSystemViewManager(context, config,
             (metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient));
       case MEMORY:
         LOG.info("Creating in-memory based Table View");
-        return new FileSystemViewManager(conf, config,
-            (metaClient, viewConfig) -> createInMemoryFileSystemView(conf, viewConfig, metaClient));
+        return new FileSystemViewManager(context, config,

Review comment:
       I don't really like overloading the in-memory based view or passing a supplier specifically for the metadata table. but this is unavoidable, given how much benefit there is to sharing it. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -209,15 +217,25 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, String datasetBas
       // Validate the Metadata Table data by listing the partitions from the file system
       timer.startTimer();
 
-      // Ignore partition metadata file
-      HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-      FileStatus[] directStatuses = metaClient.getFs().listStatus(partitionPath,
-          p -> !p.getName().equals(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE));
-      metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
-
-      List<String> directFilenames = Arrays.stream(directStatuses)
-          .map(s -> s.getPath().getName()).sorted()
+      String partitionPathStr = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), partitionPath);
+      String latestDataInstantTime = getLatestDatasetInstantTime();
+      HoodieTableFileSystemView dataFsView = new HoodieTableFileSystemView(datasetMetaClient, datasetMetaClient.getActiveTimeline());

Review comment:
       this validation stuff is actually tricky. we should do it once at the end of each metadata sync alone. If we turn this on mid stream during a commit, then the filesystem will report the newly written/inflight files as well, which the metdata table does not have and validation will always fail. this is kind of broken atm really. 

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,7 +100,11 @@ protected void initialize(HoodieEngineContext engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
-    metadata.closeReaders();
+    try {

Review comment:
       moving this to `initTableMetadata()`

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -755,6 +754,9 @@ public void testMetadataOutOfSync() throws Exception {
   private void validateMetadata(SparkRDDWriteClient client) throws IOException {
     HoodieWriteConfig config = client.getConfig();
 
+    client.close();

Review comment:
       this is a hack for now. Need to clean up such that a new client is created. 

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
##########
@@ -94,31 +93,32 @@
 
   protected final HoodieWriteConfig config;
   protected final HoodieTableMetaClient metaClient;
-  protected final transient HoodieEngineContext context;
   protected final HoodieIndex<T, I, K, O> index;
-
   private SerializableConfiguration hadoopConfiguration;
-  private transient FileSystemViewManager viewManager;
-  private HoodieTableMetadata metadata;
-
   protected final TaskContextSupplier taskContextSupplier;
+  private final HoodieTableMetadata metadata;
+
+  private transient FileSystemViewManager viewManager;
+  protected final transient HoodieEngineContext context;
 
   protected HoodieTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     this.config = config;
     this.hadoopConfiguration = context.getHadoopConf();
-    this.viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration,
-        config.getViewStorageConfig());
+    this.context = context;
+    this.metadata = HoodieTableMetadata.create(context, config.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR,
+        config.getMetadataConfig().useFileListingMetadata(), config.getMetadataConfig().getFileListingMetadataVerify(),
+        false, config.getMetadataConfig().shouldAssumeDatePartitioning());
+    this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);
     this.metaClient = metaClient;
     this.index = getIndex(config, context);
-    this.context = context;
     this.taskContextSupplier = context.getTaskContextSupplier();
   }
 
   protected abstract HoodieIndex<T, I, K, O> getIndex(HoodieWriteConfig config, HoodieEngineContext context);
 
   private synchronized FileSystemViewManager getViewManager() {
     if (null == viewManager) {
-      viewManager = FileSystemViewManager.createViewManager(hadoopConfiguration, config.getViewStorageConfig());
+      viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata);

Review comment:
       this is done so that, the`metadata` object is instantiated just once

##########
File path: hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java
##########
@@ -907,12 +909,12 @@ private HoodieWriteConfig getWriteConfig(boolean autoCommit, boolean useFileList
   private HoodieWriteConfig.Builder getWriteConfigBuilder(boolean autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
     return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
-        .withAutoCommit(autoCommit).withAssumeDatePartitioning(false)
+        .withAutoCommit(autoCommit)
         .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
             .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1)
             .withAutoClean(false).retainCommits(1).retainFileVersions(1).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().hfileMaxFileSize(1024 * 1024 * 1024).build())
-        .withEmbeddedTimelineServerEnabled(false).forTable("test-trip-table")
+        .withEmbeddedTimelineServerEnabled(true).forTable("test-trip-table")

Review comment:
       should consider sometests to run with this also parameterized.

##########
File path: hudi-client/hudi-spark-client/src/test/resources/log4j-surefire.properties
##########
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 ###
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=INFO, CONSOLE

Review comment:
       revert

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -65,14 +67,15 @@
 
   // Directory used for Spillable Map when merging records
   protected final String spillableMapDirectory;
-  private transient HoodieMetadataMergedInstantRecordScanner timelineRecordScanner;
+  private TimelineMergedTableMetadata timelineMergedMetadata;

Review comment:
       this is also serialized now, so we don't list the data timeline for merging over and over. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -63,6 +63,12 @@
   public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
   public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
 
+  public static final String HOODIE_ASSUME_DATE_PARTITIONING_PROP = "hoodie.assume.date.partitioning";

Review comment:
       consolidated everything that affects listing etc here. 

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -265,37 +280,28 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, String datasetBas
     return mergedRecord;
   }
 
-  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException;
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key);
 
-  private void openTimelineScanner() throws IOException {
-    if (timelineRecordScanner != null) {
-      // Already opened
-      return;
+  private void openTimelineScanner() {

Review comment:
       happens once on init
   

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -265,37 +280,28 @@ protected BaseTableMetadata(HoodieEngineContext engineContext, String datasetBas
     return mergedRecord;
   }
 
-  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException;
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key);
 
-  private void openTimelineScanner() throws IOException {
-    if (timelineRecordScanner != null) {
-      // Already opened
-      return;
+  private void openTimelineScanner() {
+    if (timelineMergedMetadata == null) {
+      List<HoodieInstant> unSyncedInstants = findInstantsToSync();
+      timelineMergedMetadata =
+          new TimelineMergedTableMetadata(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), null);
     }
-
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    List<HoodieInstant> unSyncedInstants = findInstantsToSync(datasetMetaClient);
-    Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
-    timelineRecordScanner =
-        new HoodieMetadataMergedInstantRecordScanner(datasetMetaClient, unSyncedInstants, getSyncedInstantTime(), schema, MAX_MEMORY_SIZE_IN_BYTES, spillableMapDirectory, null);
   }
 
-  protected List<HoodieInstant> findInstantsToSync() {
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);

Review comment:
       no extra metaclient creations.

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -98,79 +109,75 @@ public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String datas
   }
 
   @Override
-  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException {
-    openBaseAndLogFiles();
+  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) {
+    try {
+      openBaseAndLogFiles();
+      // Retrieve record from base file
+      HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+      if (baseFileReader != null) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
+        if (baseRecord.isPresent()) {
+          hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+              metaClient.getTableConfig().getPayloadClass());
+          metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+        }
+      }
 
-    // Retrieve record from base file
-    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    if (baseFileReader != null) {
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
-      if (baseRecord.isPresent()) {
-        hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-            metaClient.getTableConfig().getPayloadClass());
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+      // Retrieve record from log file
+      Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
+      if (logHoodieRecord.isPresent()) {
+        if (hoodieRecord != null) {
+          // Merge the payloads
+          HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+          hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
+        } else {
+          hoodieRecord = logHoodieRecord.get();
+        }
       }
-    }
 
-    // Retrieve record from log file
-    Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
-    if (logHoodieRecord.isPresent()) {
-      if (hoodieRecord != null) {
-        // Merge the payloads
-        HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
-        hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
-      } else {
-        hoodieRecord = logHoodieRecord.get();
+      return Option.ofNullable(hoodieRecord);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
+    } finally {
+      try {

Review comment:
       ugh. not nice

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -81,10 +86,16 @@ public HoodieBackedTableMetadata(Configuration conf, String datasetBasePath, Str
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String datasetBasePath, String spillableMapDirectory,
                                    boolean enabled, boolean validateLookups, boolean enableMetrics, boolean assumeDatePartitioning) {
     super(engineContext, datasetBasePath, spillableMapDirectory, enabled, validateLookups, enableMetrics, assumeDatePartitioning);
-    this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
-    if (enabled) {
+    initIfNeeded();
+  }
+
+  private void initIfNeeded() {
+    if (enabled && this.metaClient == null) {
+      this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
       try {
         this.metaClient = new HoodieTableMetaClient(hadoopConf.get(), metadataBasePath);
+        HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
+        latestFileSystemMetadataSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());

Review comment:
       list just once lazily

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -98,79 +109,75 @@ public HoodieBackedTableMetadata(HoodieEngineContext engineContext, String datas
   }
 
   @Override
-  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) throws IOException {
-    openBaseAndLogFiles();
+  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) {
+    try {
+      openBaseAndLogFiles();
+      // Retrieve record from base file
+      HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+      if (baseFileReader != null) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
+        if (baseRecord.isPresent()) {
+          hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
+              metaClient.getTableConfig().getPayloadClass());
+          metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+        }
+      }
 
-    // Retrieve record from base file
-    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-    if (baseFileReader != null) {
-      HoodieTimer timer = new HoodieTimer().startTimer();
-      Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
-      if (baseRecord.isPresent()) {
-        hoodieRecord = SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(),
-            metaClient.getTableConfig().getPayloadClass());
-        metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, timer.endTimer()));
+      // Retrieve record from log file
+      Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
+      if (logHoodieRecord.isPresent()) {
+        if (hoodieRecord != null) {
+          // Merge the payloads
+          HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
+          hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
+        } else {
+          hoodieRecord = logHoodieRecord.get();
+        }
       }
-    }
 
-    // Retrieve record from log file
-    Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = logRecordScanner.getRecordByKey(key);
-    if (logHoodieRecord.isPresent()) {
-      if (hoodieRecord != null) {
-        // Merge the payloads
-        HoodieRecordPayload mergedPayload = logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
-        hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), mergedPayload);
-      } else {
-        hoodieRecord = logHoodieRecord.get();
+      return Option.ofNullable(hoodieRecord);
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
+    } finally {
+      try {
+        close();
+      } catch (Exception e) {
+        throw new HoodieException("Error closing resources during metadata table merge, for key :" + key, e);
       }
     }
-
-    return Option.ofNullable(hoodieRecord);
   }
 
   /**
    * Open readers to the base and log files.
    */
-  protected synchronized void openBaseAndLogFiles() throws IOException {
-    if (logRecordScanner != null) {
-      // Already opened
-      return;
-    }
+  private synchronized void openBaseAndLogFiles() throws IOException {
 
     HoodieTimer timer = new HoodieTimer().startTimer();
 
     // Metadata is in sync till the latest completed instant on the dataset
-    HoodieTableMetaClient datasetMetaClient = new HoodieTableMetaClient(hadoopConf.get(), datasetBasePath);
-    String latestInstantTime = datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
-        .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
-    // Find the latest file slice

Review comment:
       all these done only once on init

##########
File path: hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
##########
@@ -36,9 +36,10 @@
 
   private final HoodieTableMetadata tableMetadata;

Review comment:
       this instance is not closed. but its not the responsibilty of this object 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org