You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/01/19 19:32:21 UTC

[GitHub] [hudi] vinothchandar commented on a change in pull request #4531: [HUDI-3191] Rebasing Hive's FileInputFormat onto `AbstractHoodieTableFileIndex`

vinothchandar commented on a change in pull request #4531:
URL: https://github.com/apache/hudi/pull/4531#discussion_r788061010



##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+/**
+ * Hudi table could be queried in one of the 3 following ways:
+ *
+ * <ol>
+ *   <li>Snapshot: snapshot of the table at the given (latest if not provided) instant is queried</li>
+ *   <li>Read Optimized (MOR only): snapshot of the table at the given (latest if not provided)
+ *   instant is queried, but w/o reading any of the delta-log files (only reading base-files)</li>
+ *   <li>Incremental: only records added w/in the given time-window (defined by beginning and ending instant)
+ *   are queried</li>
+ * </ol>
+ */
+public enum HoodieTableQueryType {
+  QUERY_TYPE_SNAPSHOT,

Review comment:
       drop the `QUERY_TYPE` prefix?

##########
File path: hudi-common/src/main/scala/org/apache/hudi/HoodieTableFileIndexBase.scala
##########
@@ -50,11 +49,14 @@ import scala.collection.mutable
  * @param shouldIncludePendingCommits flags whether file-index should exclude any pending operations
  * @param fileStatusCache transient cache of fetched [[FileStatus]]es
  */
-abstract class AbstractHoodieTableFileIndex(engineContext: HoodieEngineContext,
-                                            metaClient: HoodieTableMetaClient,
-                                            configProperties: TypedProperties,
-                                            specifiedQueryInstant: Option[String] = None,
-                                            @transient fileStatusCache: FileStatusCacheTrait) {
+abstract class HoodieTableFileIndexBase(engineContext: HoodieEngineContext,

Review comment:
       BaseHoodieTableIndex or just even HoodieTableFileIndex

##########
File path: hudi-common/src/main/java/org/apache/hudi/common/model/HoodieTableQueryType.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+/**
+ * Hudi table could be queried in one of the 3 following ways:
+ *
+ * <ol>
+ *   <li>Snapshot: snapshot of the table at the given (latest if not provided) instant is queried</li>
+ *   <li>Read Optimized (MOR only): snapshot of the table at the given (latest if not provided)
+ *   instant is queried, but w/o reading any of the delta-log files (only reading base-files)</li>
+ *   <li>Incremental: only records added w/in the given time-window (defined by beginning and ending instant)
+ *   are queried</li>
+ * </ol>
+ */
+public enum HoodieTableQueryType {

Review comment:
       also just `HoodieQueryType`?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -67,6 +82,27 @@ public final void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, Stream<HoodieLogFile> logFiles) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Nonnull
+  private static FileStatus getFileStatusUnchecked(Option<HoodieBaseFile> baseFileOpt) {
+    try {
+      return HoodieInputFormatUtils.getFileStatus(baseFileOpt.get());
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);

Review comment:
       Wrap into HoodieIOException?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieTableFileIndex.java
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.HoodieTableFileIndexBase;
+import org.apache.hudi.FileStatusCacheTrait;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableQueryType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.slf4j.Logger;

Review comment:
       lets stick to log4j?

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -67,6 +82,27 @@ public final void setConf(Configuration conf) {
     this.conf = conf;
   }
 
+  @Nonnull
+  private static RealtimeFileStatus createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile, Stream<HoodieLogFile> logFiles) {
+    List<HoodieLogFile> sortedLogFiles = logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+    try {
+      RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(latestLogFile.getFileStatus());
+      rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+      return rtFileStatus;
+    } catch (IOException e) {
+      throw new RuntimeException(e);

Review comment:
       Same here.

##########
File path: hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -102,18 +138,98 @@ public final void setConf(Configuration conf) {
     // process snapshot queries next.
     List<Path> snapshotPaths = inputPathHandler.getSnapshotPaths();
     if (snapshotPaths.size() > 0) {
-      returns.addAll(HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths, includeLogFilesForSnapShotView()));
+      returns.addAll(listStatusForSnapshotMode(job, tableMetaClientMap, snapshotPaths));
     }
     return returns.toArray(new FileStatus[0]);
   }
 
+  @Nonnull
+  private List<FileStatus> listStatusForSnapshotMode(JobConf job,
+                                                     Map<String, HoodieTableMetaClient> tableMetaClientMap,
+                                                     List<Path> snapshotPaths) throws IOException {
+    HoodieLocalEngineContext engineContext = new HoodieLocalEngineContext(job);
+    List<FileStatus> targetFiles = new ArrayList<>();
+
+    TypedProperties props = new TypedProperties(new Properties());
+
+    Map<HoodieTableMetaClient, List<Path>> groupedPaths =
+        HoodieInputFormatUtils.groupSnapshotPathsByMetaClient(tableMetaClientMap.values(), snapshotPaths);
+
+    for (Map.Entry<HoodieTableMetaClient, List<Path>> entry : groupedPaths.entrySet()) {
+      HoodieTableMetaClient tableMetaClient = entry.getKey();
+      List<Path> partitionPaths = entry.getValue();
+
+      // Hive job might specify a max commit instant up to which table's state
+      // should be examined. We simply pass it as query's instant to the file-index
+      Option<String> queryCommitInstant =
+          HoodieHiveUtils.getMaxCommit(job, tableMetaClient.getTableConfig().getTableName());
+
+      boolean shouldIncludePendingCommits =
+          HoodieHiveUtils.shouldIncludePendingCommits(job, tableMetaClient.getTableConfig().getTableName());
+
+      HiveHoodieTableFileIndex fileIndex =
+          new HiveHoodieTableFileIndex(
+              engineContext,
+              tableMetaClient,
+              props,
+              HoodieTableQueryType.QUERY_TYPE_SNAPSHOT,
+              partitionPaths,
+              queryCommitInstant,
+              shouldIncludePendingCommits);
+
+      Map<String, Seq<FileSlice>> partitionedFileSlices =
+          JavaConverters.mapAsJavaMapConverter(fileIndex.listFileSlices()).asJava();
+
+      targetFiles.addAll(
+          partitionedFileSlices.values()
+              .stream()
+              .flatMap(seq -> JavaConverters.seqAsJavaListConverter(seq).asJava().stream())
+              .map(fileSlice -> {
+                Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
+                Option<HoodieLogFile> latestLogFileOpt = fileSlice.getLatestLogFile();
+                if (baseFileOpt.isPresent()) {
+                  return getFileStatusUnchecked(baseFileOpt);
+                } else if (includeLogFilesForSnapShotView() && latestLogFileOpt.isPresent()) {
+                  return createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), fileSlice.getLogFiles());
+                } else {
+                  throw new IllegalStateException("Invalid state: either base-file or log-file should be present");
+                }
+              })
+              .collect(Collectors.toList())
+      );
+    }
+
+    // TODO cleanup
+    validate(targetFiles, listStatusForSnapshotModeLegacy(job, tableMetaClientMap, snapshotPaths));

Review comment:
       lets file a JIRA for this for tracking

##########
File path: packaging/hudi-presto-bundle/pom.xml
##########
@@ -66,15 +66,20 @@
                 <includes>
                   <include>org.apache.hudi:hudi-common</include>
                   <include>org.apache.hudi:hudi-hadoop-mr</include>
+
+                  <!-- TODO(HUDI-3239) remove this -->
+                  <include>org.scala-lang:scala-library</include>
+
+                  <include>org.apache.parquet:parquet-avro</include>

Review comment:
       all of this is problematic. cc @codope 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org