You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/11/05 09:26:03 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #3888: [HUDI-2624] Implement Non Index type for HUDI

yanghua commented on a change in pull request #3888:
URL: https://github.com/apache/hudi/pull/3888#discussion_r743495556



##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -384,6 +385,9 @@
           + " but for the case the write schema is not equal to the specified table schema, we can"
           + " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand");
 
+  public static final String UPDATE_JOIN_FIELDS = "hoodie.update.join.fields";
+  public static final String UPDATE_NULL_FIELDS = "hoodie.update.null.fields";

Review comment:
       More comments need to be fill.

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowUpdateHandle.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HoodieRowUpdateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowUpdateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final StructType structType;
+  private final String prevFileName;
+  private final int shouldUpdateFieldIndex;
+  private final Set<Integer> updateNullFieldIndexSet;
+  Map<Integer, Pair<Integer, DataType>> updateValueFieldMap;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  private boolean isFileUpdated = false;
+  private long writeTimeAccumulator = 0;
+  private long updateRecordsWritten = 0;
+
+  private Long greatestSequenceNumber = -1L;
+
+  public HoodieRowUpdateHandle(
+      HoodieTable table,
+      HoodieWriteConfig writeConfig,
+      String partitionPath,
+      String fileId,
+      String instantTime,
+      int taskPartitionId,
+      long taskId,
+      long taskEpochId,
+      StructType structType,
+      String prevFileName,
+      int shouldUpdateFieldIndex,
+      Set<Integer> updateNullFieldIndexSet,
+      Map<Integer, Pair<Integer, DataType>> updateValueFieldMap) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    this.structType = structType;
+    this.prevFileName = prevFileName;
+    this.shouldUpdateFieldIndex = shouldUpdateFieldIndex;
+    this.updateNullFieldIndexSet = updateNullFieldIndexSet;
+    this.updateValueFieldMap = updateValueFieldMap;
+    this.isFileUpdated = false;
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    long start = System.currentTimeMillis();
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+      long sequenceNumber = Long.parseLong(seqId.split("_")[2]);
+      if (sequenceNumber < greatestSequenceNumber) {
+        throw new RuntimeException("Wrong sequence number, records should be sorted");
+      } else {
+        greatestSequenceNumber = sequenceNumber;
+      }
+      Integer recordKeyPos = HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+          .get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      if (!record.isNullAt(shouldUpdateFieldIndex)) {
+        isFileUpdated = true;
+        updateRecordsWritten++;
+        for (Integer updateNullFieldIndex : updateNullFieldIndexSet) {
+          record.setNullAt(updateNullFieldIndex);
+        }
+        for (Entry<Integer, Pair<Integer, DataType>> entry : updateValueFieldMap.entrySet()) {
+          record.update(entry.getKey(), record.get(entry.getValue().getKey(), entry.getValue().getValue()));
+        }
+      }
+      String recordKey = null;
+      if (!record.isNullAt(recordKeyPos)) {
+        recordKey = record.getUTF8String(recordKeyPos).toString();
+      }
+
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
+          new InternalRowWrapper(structType.length(), record));
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+    writeTimeAccumulator += (System.currentTimeMillis() - start);
+  }
+
+  public HoodieInternalWriteStatus close() throws IOException {
+    long start = System.currentTimeMillis();
+    fileWriter.close();
+    if (isFileUpdated) {
+      HoodieWriteStat stat = new HoodieWriteStat();
+      stat.setPartitionPath(partitionPath);
+      stat.setNumWrites(writeStatus.getTotalRecords());
+      stat.setNumDeletes(0);
+      stat.setNumInserts(0);
+      stat.setNumUpdateWrites(updateRecordsWritten);
+      stat.setPrevCommit(prevFileName);
+      stat.setFileId(fileId);
+      stat.setPath(new Path(writeConfig.getBasePath()), path);
+      long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+      stat.setTotalWriteBytes(fileSizeInBytes);
+      stat.setFileSizeInBytes(fileSizeInBytes);
+      stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+      HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+      long processTime = currTimer.endTimer();
+      long writeTime = (System.currentTimeMillis() - start + writeTimeAccumulator) / 1000;
+      LOG.info("Finish updating file " + getFileName() + " of partition " + partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and " + writeTime
+          + " seconds to write.");
+      LOG.info("Finish updating file " + getFileName() + " of partition " + partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and "
+          + (System.currentTimeMillis() - start) / 1000 + " seconds to write.");
+      runtimeStats.setTotalCreateTime(processTime);
+      stat.setRuntimeStats(runtimeStats);
+      writeStatus.setStat(stat);
+      return writeStatus;
+    } else {
+      try {
+        fs.delete(path, false);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete unnecessary file " + path.toString());

Review comment:
       append the `e` for more detailed information?

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowUpdateHandle.java
##########
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.client.HoodieInternalWriteStatus;
+import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieInsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.hudi.table.marker.WriteMarkersFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class HoodieRowUpdateHandle implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LogManager.getLogger(HoodieRowUpdateHandle.class);
+  private static final AtomicLong SEQGEN = new AtomicLong(1);
+
+  private final String instantTime;
+  private final int taskPartitionId;
+  private final long taskId;
+  private final long taskEpochId;
+  private final HoodieTable table;
+  private final HoodieWriteConfig writeConfig;
+  private final HoodieInternalRowFileWriter fileWriter;
+  private final String partitionPath;
+  private final Path path;
+  private final String fileId;
+  private final StructType structType;
+  private final String prevFileName;
+  private final int shouldUpdateFieldIndex;
+  private final Set<Integer> updateNullFieldIndexSet;
+  Map<Integer, Pair<Integer, DataType>> updateValueFieldMap;
+  private final FileSystem fs;
+  private final HoodieInternalWriteStatus writeStatus;
+  private final HoodieTimer currTimer;
+
+  private boolean isFileUpdated = false;
+  private long writeTimeAccumulator = 0;
+  private long updateRecordsWritten = 0;
+
+  private Long greatestSequenceNumber = -1L;
+
+  public HoodieRowUpdateHandle(
+      HoodieTable table,
+      HoodieWriteConfig writeConfig,
+      String partitionPath,
+      String fileId,
+      String instantTime,
+      int taskPartitionId,
+      long taskId,
+      long taskEpochId,
+      StructType structType,
+      String prevFileName,
+      int shouldUpdateFieldIndex,
+      Set<Integer> updateNullFieldIndexSet,
+      Map<Integer, Pair<Integer, DataType>> updateValueFieldMap) {
+    this.partitionPath = partitionPath;
+    this.table = table;
+    this.writeConfig = writeConfig;
+    this.instantTime = instantTime;
+    this.taskPartitionId = taskPartitionId;
+    this.taskId = taskId;
+    this.taskEpochId = taskEpochId;
+    this.fileId = fileId;
+    this.currTimer = new HoodieTimer();
+    this.currTimer.startTimer();
+    this.fs = table.getMetaClient().getFs();
+    this.path = makeNewPath(partitionPath);
+    this.writeStatus = new HoodieInternalWriteStatus(!table.getIndex().isImplicitWithStorage(),
+        writeConfig.getWriteStatusFailureFraction());
+    writeStatus.setPartitionPath(partitionPath);
+    writeStatus.setFileId(fileId);
+    this.structType = structType;
+    this.prevFileName = prevFileName;
+    this.shouldUpdateFieldIndex = shouldUpdateFieldIndex;
+    this.updateNullFieldIndexSet = updateNullFieldIndexSet;
+    this.updateValueFieldMap = updateValueFieldMap;
+    this.isFileUpdated = false;
+    try {
+      HoodiePartitionMetadata partitionMetadata =
+          new HoodiePartitionMetadata(
+              fs,
+              instantTime,
+              new Path(writeConfig.getBasePath()),
+              FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath));
+      partitionMetadata.trySave(taskPartitionId);
+      createMarkerFile(partitionPath, FSUtils.makeDataFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension()));
+      this.fileWriter = createNewFileWriter(path, table, writeConfig, structType);
+    } catch (IOException e) {
+      throw new HoodieInsertException("Failed to initialize file writer for path " + path, e);
+    }
+    LOG.info("New handle created for partition :" + partitionPath + " with fileId " + fileId);
+  }
+
+  /**
+   * Writes an {@link InternalRow} to the underlying HoodieInternalRowFileWriter. Before writing, value for meta columns are computed as required
+   * and wrapped in {@link HoodieInternalRow}. {@link HoodieInternalRow} is what gets written to HoodieInternalRowFileWriter.
+   *
+   * @param record instance of {@link InternalRow} that needs to be written to the fileWriter.
+   * @throws IOException
+   */
+  public void write(InternalRow record) throws IOException {
+    long start = System.currentTimeMillis();
+    try {
+      String partitionPath = record.getUTF8String(HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS.get(
+          HoodieRecord.PARTITION_PATH_METADATA_FIELD)).toString();
+      String seqId = HoodieRecord.generateSequenceId(instantTime, taskPartitionId, SEQGEN.getAndIncrement());
+      long sequenceNumber = Long.parseLong(seqId.split("_")[2]);
+      if (sequenceNumber < greatestSequenceNumber) {
+        throw new RuntimeException("Wrong sequence number, records should be sorted");
+      } else {
+        greatestSequenceNumber = sequenceNumber;
+      }
+      Integer recordKeyPos = HoodieRecord.HOODIE_META_COLUMNS_NAME_TO_POS
+          .get(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+      if (!record.isNullAt(shouldUpdateFieldIndex)) {
+        isFileUpdated = true;
+        updateRecordsWritten++;
+        for (Integer updateNullFieldIndex : updateNullFieldIndexSet) {
+          record.setNullAt(updateNullFieldIndex);
+        }
+        for (Entry<Integer, Pair<Integer, DataType>> entry : updateValueFieldMap.entrySet()) {
+          record.update(entry.getKey(), record.get(entry.getValue().getKey(), entry.getValue().getValue()));
+        }
+      }
+      String recordKey = null;
+      if (!record.isNullAt(recordKeyPos)) {
+        recordKey = record.getUTF8String(recordKeyPos).toString();
+      }
+
+      HoodieInternalRow internalRow = new HoodieInternalRow(instantTime, seqId, recordKey, partitionPath, path.getName(),
+          new InternalRowWrapper(structType.length(), record));
+      try {
+        fileWriter.writeRow(recordKey, internalRow);
+        writeStatus.markSuccess(recordKey);
+      } catch (Throwable t) {
+        writeStatus.markFailure(recordKey, t);
+      }
+    } catch (Throwable ge) {
+      writeStatus.setGlobalError(ge);
+      throw ge;
+    }
+    writeTimeAccumulator += (System.currentTimeMillis() - start);
+  }
+
+  public HoodieInternalWriteStatus close() throws IOException {
+    long start = System.currentTimeMillis();
+    fileWriter.close();
+    if (isFileUpdated) {
+      HoodieWriteStat stat = new HoodieWriteStat();
+      stat.setPartitionPath(partitionPath);
+      stat.setNumWrites(writeStatus.getTotalRecords());
+      stat.setNumDeletes(0);
+      stat.setNumInserts(0);
+      stat.setNumUpdateWrites(updateRecordsWritten);
+      stat.setPrevCommit(prevFileName);
+      stat.setFileId(fileId);
+      stat.setPath(new Path(writeConfig.getBasePath()), path);
+      long fileSizeInBytes = FSUtils.getFileSize(table.getMetaClient().getFs(), path);
+      stat.setTotalWriteBytes(fileSizeInBytes);
+      stat.setFileSizeInBytes(fileSizeInBytes);
+      stat.setTotalWriteErrors(writeStatus.getFailedRowsSize());
+      HoodieWriteStat.RuntimeStats runtimeStats = new HoodieWriteStat.RuntimeStats();
+      long processTime = currTimer.endTimer();
+      long writeTime = (System.currentTimeMillis() - start + writeTimeAccumulator) / 1000;
+      LOG.info("Finish updating file " + getFileName() + " of partition " + partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and " + writeTime
+          + " seconds to write.");
+      LOG.info("Finish updating file " + getFileName() + " of partition " + partitionPath
+          + ", takes " + processTime / 1000 + " seconds to process and "
+          + (System.currentTimeMillis() - start) / 1000 + " seconds to write.");

Review comment:
       Make it more readable? Two log? So many "+" links.

##########
File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -384,6 +385,9 @@
           + " but for the case the write schema is not equal to the specified table schema, we can"
           + " specify the write schema by this parameter. Used by MergeIntoHoodieTableCommand");
 
+  public static final String UPDATE_JOIN_FIELDS = "hoodie.update.join.fields";
+  public static final String UPDATE_NULL_FIELDS = "hoodie.update.null.fields";
+  public static final String UPDATE_PARTITION_PATHS = "hoodie.update.partition.paths";

Review comment:
       empty line

##########
File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -47,6 +49,14 @@
   public SparkUpsertDeltaCommitPartitioner(WorkloadProfile profile, HoodieSparkEngineContext context, HoodieTable table,
                                            HoodieWriteConfig config) {
     super(profile, context, table, config);
+    // todo

Review comment:
       `todo` for what?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org