You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/13 00:16:08 UTC

[GitHub] [hudi] satishkotha commented on a change in pull request #3970: [HUDI-2731] Make clustering work regardless of whether there are base…

satishkotha commented on a change in pull request #3970:
URL: https://github.com/apache/hudi/pull/3970#discussion_r748646037



##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -205,12 +207,26 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext
               .withSpillableMapBasePath(config.getSpillableMapBasePath())
               .build();
 
-          HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-          recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
-              tableConfig.getPayloadClass(),
-              tableConfig.getPreCombineField(),
-              tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-                  tableConfig.getPartitionFieldProp()))));
+          if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) {
+            HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
+            HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+            recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
+                tableConfig.getPayloadClass(),
+                tableConfig.getPreCombineField(),
+                tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                    tableConfig.getPartitionFieldProp()))));
+          } else {
+            // Since there is no base file, fall back to reading log files
+            Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();
+            recordIterators.add(StreamSupport.stream(iterable.spliterator(), false)
+                .map(e -> {
+                  try {
+                    return transform((IndexedRecord) e.getData().getInsertValue(readerSchema).get());
+                  } catch (IOException io) {
+                    throw new UncheckedIOException(io);

Review comment:
       minor: We use HoodieIOException in rest of the code. consider using that for consistency.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
##########
@@ -205,12 +207,26 @@ public MultipleSparkJobExecutionStrategy(HoodieTable table, HoodieEngineContext
               .withSpillableMapBasePath(config.getSpillableMapBasePath())
               .build();
 
-          HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
-          recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
-              tableConfig.getPayloadClass(),
-              tableConfig.getPreCombineField(),
-              tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
-                  tableConfig.getPartitionFieldProp()))));
+          if (!StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())) {
+            HoodieFileReader<? extends IndexedRecord> baseFileReader = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), new Path(clusteringOp.getDataFilePath()));
+            HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
+            recordIterators.add(getFileSliceReader(baseFileReader, scanner, readerSchema,
+                tableConfig.getPayloadClass(),
+                tableConfig.getPreCombineField(),
+                tableConfig.populateMetaFields() ? Option.empty() : Option.of(Pair.of(tableConfig.getRecordKeyFieldProp(),
+                    tableConfig.getPartitionFieldProp()))));
+          } else {
+            // Since there is no base file, fall back to reading log files
+            Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();

Review comment:
       Functionality looks good. But what do you think reorganizing this a little? Here is what I'm thinking:
   
   Change HoodieFileSliceReader#getFileSliceReader method to take Option[HoodieBaseFileReader].  This whole logic can be embedded inside that method (Introduce new methods if needed).  
   
   Makes it easy to reuse code if there are other places that need to read FileSlices. Please try and let me know if you think this is reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org