You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "sivabalan narayanan (Jira)" <ji...@apache.org> on 2022/03/01 22:17:00 UTC

[jira] [Assigned] (HUDI-3543) Clean up HoodieIncrSource for commented out code

     [ https://issues.apache.org/jira/browse/HUDI-3543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

sivabalan narayanan reassigned HUDI-3543:
-----------------------------------------

    Assignee: leesf

> Clean up HoodieIncrSource for commented out code
> ------------------------------------------------
>
>                 Key: HUDI-3543
>                 URL: https://issues.apache.org/jira/browse/HUDI-3543
>             Project: Apache Hudi
>          Issue Type: Task
>          Components: deltastreamer
>            Reporter: sivabalan narayanan
>            Assignee: leesf
>            Priority: Major
>
> We find some commented out code in HoodieIncrSource. Clean up if not required. 
>  
> {code:java}
> /*
>  * DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.HOODIE_SRC_BASE_PATH,
>  * Config.HOODIE_SRC_PARTITION_FIELDS)); List<String> partitionFields =
>  * props.getStringList(Config.HOODIE_SRC_PARTITION_FIELDS, ",", new ArrayList<>()); PartitionValueExtractor
>  * extractor = DataSourceUtils.createPartitionExtractor(props.getString( Config.HOODIE_SRC_PARTITION_EXTRACTORCLASS,
>  * Config.DEFAULT_HOODIE_SRC_PARTITION_EXTRACTORCLASS));
>  */ {code}
> {code:java}
> /*
>  * log.info("Partition Fields are : (" + partitionFields + "). Initial Source Schema :" + source.schema());
>  *
>  * StructType newSchema = new StructType(source.schema().fields()); for (String field : partitionFields) { newSchema
>  * = newSchema.add(field, DataTypes.StringType, true); }
>  *
>  * /** Validates if the commit time is sane and also generates Partition fields from _hoodie_partition_path if
>  * configured
>  *
>  * Dataset<Row> validated = source.map((MapFunction<Row, Row>) (Row row) -> { // _hoodie_instant_time String
>  * instantTime = row.getString(0); IncrSourceHelper.validateInstantTime(row, instantTime, instantEndpts.getKey(),
>  * instantEndpts.getValue()); if (!partitionFields.isEmpty()) { // _hoodie_partition_path String hoodiePartitionPath
>  * = row.getString(3); List<Object> partitionVals =
>  * extractor.extractPartitionValuesInPath(hoodiePartitionPath).stream() .map(o -> (Object)
>  * o).collect(Collectors.toList()); ValidationUtils.checkArgument(partitionVals.size() == partitionFields.size(),
>  * "#partition-fields != #partition-values-extracted"); List<Object> rowObjs = new
>  * ArrayList<>(scala.collection.JavaConversions.seqAsJavaList(row.toSeq())); rowObjs.addAll(partitionVals); return
>  * RowFactory.create(rowObjs.toArray()); } return row; }, RowEncoder.apply(newSchema));
>  *
>  * log.info("Validated Source Schema :" + validated.schema());
>  */ {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)