You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/18 23:16:46 UTC

[hudi] branch master updated: [HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841)

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aae437  [HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841)
1aae437 is described below

commit 1aae437257cfd94fd277cf667257f6abffcc0c21
Author: Udit Mehrotra <um...@illinois.edu>
AuthorDate: Sat Jul 18 16:16:32 2020 -0700

    [HUDI-1102] Add common useful Spark related and Table path detection utilities (#1841)
    
    Co-authored-by: Mehrotra <ud...@amazon.com>
---
 .../hudi/common/table/HoodieTableMetaClient.java   |   1 +
 .../apache/hudi/common/util/TablePathUtils.java    | 110 ++++++++++++++++++
 .../hudi/common/util/TestTablePathUtils.java       | 126 +++++++++++++++++++++
 .../main/java/org/apache/hudi/DataSourceUtils.java |  23 ++++
 .../scala/org/apache/hudi/HudiSparkUtils.scala     |  50 ++++++++
 .../scala/org/apache/hudi/TestHudiSparkUtils.scala | 105 +++++++++++++++++
 6 files changed, 415 insertions(+)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index 9675b77..b047595 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -73,6 +73,7 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String METAFOLDER_NAME = ".hoodie";
   public static final String TEMPFOLDER_NAME = METAFOLDER_NAME + File.separator + ".temp";
   public static final String AUXILIARYFOLDER_NAME = METAFOLDER_NAME + File.separator + ".aux";
+  public static final String BOOTSTRAP_INDEX_ROOT_FOLDER_PATH = AUXILIARYFOLDER_NAME + File.separator + ".bootstrap";
   public static final String MARKER_EXTN = ".marker";
 
   private String basePath;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
new file mode 100644
index 0000000..6982fdb
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/TablePathUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+public class TablePathUtils {
+
+  private static final Logger LOG = LogManager.getLogger(TablePathUtils.class);
+
+  private static boolean hasTableMetadataFolder(FileSystem fs, Path path) {
+    if (path == null) {
+      return false;
+    }
+
+    try {
+      return fs.exists(new Path(path, HoodieTableMetaClient.METAFOLDER_NAME));
+    } catch (IOException ioe) {
+      throw new HoodieException("Error checking Hoodie metadata folder for " + path, ioe);
+    }
+  }
+
+  public static Option<Path> getTablePath(FileSystem fs, Path path) throws HoodieException, IOException {
+    LOG.info("Getting table path from path : " + path);
+
+    FileStatus fileStatus = fs.getFileStatus(path);
+    Path directory = fileStatus.isFile() ? fileStatus.getPath().getParent() : fileStatus.getPath();
+
+    if (TablePathUtils.hasTableMetadataFolder(fs, directory)) {
+      // Handle table folder itself
+      return Option.of(directory);
+    }
+
+    // Handle metadata folder or metadata sub folder path
+    Option<Path> tablePath = getTablePathFromTableMetadataPath(fs, directory);
+    if (tablePath.isPresent()) {
+      return tablePath;
+    }
+
+    // Handle partition folder
+    return getTablePathFromPartitionPath(fs, directory);
+  }
+
+  private static boolean isTableMetadataFolder(String path) {
+    return path != null && path.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME);
+  }
+
+  private static boolean isInsideTableMetadataFolder(String path) {
+    return path != null && path.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/");
+  }
+
+  private static Option<Path> getTablePathFromTableMetadataPath(FileSystem fs, Path path) {
+    String pathStr = path.toString();
+
+    if (isTableMetadataFolder(pathStr)) {
+      return Option.of(path.getParent());
+    } else if (isInsideTableMetadataFolder(pathStr)) {
+      int index = pathStr.indexOf("/" + HoodieTableMetaClient.METAFOLDER_NAME);
+      return Option.of(new Path(pathStr.substring(0, index)));
+    }
+
+    return Option.empty();
+  }
+
+  private static Option<Path> getTablePathFromPartitionPath(FileSystem fs, Path partitionPath) {
+    try {
+      if (HoodiePartitionMetadata.hasPartitionMetadata(fs, partitionPath)) {
+        HoodiePartitionMetadata metadata = new HoodiePartitionMetadata(fs, partitionPath);
+        metadata.readFromFS();
+        return Option.of(getNthParent(partitionPath, metadata.getPartitionDepth()));
+      }
+    } catch (IOException ioe) {
+      throw new HoodieException("Error reading partition metadata for " + partitionPath, ioe);
+    }
+
+    return Option.empty();
+  }
+
+  private static Path getNthParent(Path path, int n) {
+    Path parent = path;
+    for (int i = 0; i < n; i++) {
+      parent = parent.getParent();
+    }
+    return parent;
+  }
+}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
new file mode 100644
index 0000000..05031f0
--- /dev/null
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.Paths;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public final class TestTablePathUtils {
+
+  @TempDir
+  static File tempDir;
+  private static FileSystem fs;
+  private static Path tablePath;
+  private static Path partitionPath1;
+  private static Path partitionPath2;
+  private static Path filePath1;
+  private static Path filePath2;
+
+  @BeforeAll
+  static void setup() throws IOException {
+    URI tablePathURI = Paths.get(tempDir.getAbsolutePath(),"test_table").toUri();
+    tablePath = new Path(tablePathURI);
+    fs = tablePath.getFileSystem(new Configuration());
+
+    // Create bootstrap index folder
+    assertTrue(new File(
+        Paths.get(tablePathURI.getPath(), HoodieTableMetaClient.BOOTSTRAP_INDEX_ROOT_FOLDER_PATH).toUri()).mkdirs());
+
+    // Create partition folders
+    URI partitionPathURI1 = Paths.get(tablePathURI.getPath(),"key1=abc/key2=def").toUri();
+    partitionPath1 = new Path(partitionPathURI1);
+    URI partitionPathURI2 = Paths.get(tablePathURI.getPath(),"key1=xyz/key2=def").toUri();
+    partitionPath2 = new Path(partitionPathURI2);
+
+    assertTrue(new File(partitionPathURI1).mkdirs());
+    assertTrue(new File(partitionPathURI2).mkdirs());
+
+    HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath,
+                                                                             partitionPath1);
+    partitionMetadata1.trySave(1);
+    HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata(fs, Instant.now().toString(), tablePath,
+                                                                             partitionPath2);
+    partitionMetadata2.trySave(2);
+
+    // Create files
+    URI filePathURI1 = Paths.get(partitionPathURI1.getPath(), "data1.parquet").toUri();
+    filePath1 = new Path(filePathURI1);
+    URI filePathURI2 = Paths.get(partitionPathURI2.getPath(), "data2.parquet").toUri();
+    filePath2 = new Path(filePathURI2);
+
+    assertTrue(new File(filePathURI1).createNewFile());
+    assertTrue(new File(filePathURI2).createNewFile());
+  }
+
+  @Test
+  void getTablePathFromTablePath() throws IOException {
+    Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, tablePath);
+    assertEquals(tablePath, inferredTablePath.get());
+  }
+
+  @Test
+  void getTablePathFromMetadataFolderPath() throws IOException {
+    Path metadataFolder = new Path(tablePath, HoodieTableMetaClient.METAFOLDER_NAME);
+    Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, metadataFolder);
+    assertEquals(tablePath, inferredTablePath.get());
+  }
+
+  @Test
+  void getTablePathFromMetadataSubFolderPath() throws IOException {
+    Path auxFolder = new Path(tablePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME);
+    Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, auxFolder);
+    assertEquals(tablePath, inferredTablePath.get());
+
+    Path bootstrapIndexFolder = new Path(tablePath, HoodieTableMetaClient.BOOTSTRAP_INDEX_ROOT_FOLDER_PATH);
+    inferredTablePath = TablePathUtils.getTablePath(fs, bootstrapIndexFolder);
+    assertEquals(tablePath, inferredTablePath.get());
+  }
+
+  @Test
+  void getTablePathFromPartitionFolderPath() throws IOException {
+    Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath1);
+    assertEquals(tablePath, inferredTablePath.get());
+
+    inferredTablePath = TablePathUtils.getTablePath(fs, partitionPath2);
+    assertEquals(tablePath, inferredTablePath.get());
+  }
+
+  @Test
+  void getTablePathFromFilePath() throws IOException {
+    Option<Path> inferredTablePath = TablePathUtils.getTablePath(fs, filePath1);
+    assertEquals(tablePath, inferredTablePath.get());
+
+    inferredTablePath = TablePathUtils.getTablePath(fs, filePath2);
+    assertEquals(tablePath, inferredTablePath.get());
+  }
+}
diff --git a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
index fe68af9..36212d0 100644
--- a/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
+++ b/hudi-spark/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.hudi;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.HoodieReadClient;
 import org.apache.hudi.client.HoodieWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -28,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -45,6 +48,8 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -61,6 +66,8 @@ import java.util.stream.Collectors;
  */
 public class DataSourceUtils {
 
+  private static final Logger LOG = LogManager.getLogger(DataSourceUtils.class);
+
   /**
    * Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
    */
@@ -105,6 +112,22 @@ public class DataSourceUtils {
     }
   }
 
+  public static String getTablePath(FileSystem fs, Path[] userProvidedPaths) throws IOException {
+    LOG.info("Getting table path..");
+    for (Path path: userProvidedPaths) {
+      try {
+        Option<Path> tablePath = TablePathUtils.getTablePath(fs, path);
+        if (tablePath.isPresent()) {
+          return tablePath.get().toString();
+        }
+      } catch (HoodieException he) {
+        LOG.warn("Error trying to get table path from " + path.toString(), he);
+      }
+    }
+
+    throw new TableNotFoundException("Unable to find a hudi table for the user provided paths.");
+  }
+
   /**
    * This method converts values for fields with certain Avro/Parquet data types that require special handling.
    *
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala b/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
new file mode 100644
index 0000000..861de14
--- /dev/null
+++ b/hudi-spark/src/main/scala/org/apache/hudi/HudiSparkUtils.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hudi.common.model.HoodieRecord
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, InMemoryFileIndex}
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import scala.collection.JavaConverters._
+
+
+object HudiSparkUtils {
+
+  def getHudiMetadataSchema: StructType = {
+    StructType(HoodieRecord.HOODIE_META_COLUMNS.asScala.map(col => {
+      StructField(col, StringType, nullable = true)
+    }))
+  }
+
+  def checkAndGlobPathIfNecessary(paths: Seq[String], fs: FileSystem): Seq[Path] = {
+    paths.flatMap(path => {
+      val qualified = new Path(path).makeQualified(fs.getUri, fs.getWorkingDirectory)
+      val globPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualified)
+      globPaths
+    })
+  }
+
+  def createInMemoryFileIndex(sparkSession: SparkSession, globbedPaths: Seq[Path]): InMemoryFileIndex = {
+    val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+    new InMemoryFileIndex(sparkSession, globbedPaths, Map(), Option.empty, fileStatusCache)
+  }
+}
diff --git a/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala b/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
new file mode 100644
index 0000000..6b1a178
--- /dev/null
+++ b/hudi-spark/src/test/scala/org/apache/hudi/TestHudiSparkUtils.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import java.io.File
+import java.nio.file.Paths
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.io.TempDir
+
+class TestHudiSparkUtils {
+
+  @Test
+  def testGlobPaths(@TempDir tempDir: File): Unit = {
+    val folders: Seq[Path] = Seq(
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
+    )
+
+    val files: Seq[Path] = Seq(
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
+    )
+
+    folders.foreach(folder => new File(folder.toUri).mkdir())
+    files.foreach(file => new File(file.toUri).createNewFile())
+
+    var paths = Seq(tempDir.getAbsolutePath + "/*")
+    var globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+      new Path(paths.head).getFileSystem(new Configuration()))
+    assertEquals(folders.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
+
+    paths = Seq(tempDir.getAbsolutePath + "/*/*")
+    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+      new Path(paths.head).getFileSystem(new Configuration()))
+    assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
+
+    paths = Seq(tempDir.getAbsolutePath + "/folder1/*")
+    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+      new Path(paths.head).getFileSystem(new Configuration()))
+    assertEquals(Seq(files(0), files(1)).sortWith(_.toString < _.toString),
+      globbedPaths.sortWith(_.toString < _.toString))
+
+    paths = Seq(tempDir.getAbsolutePath + "/folder2/*")
+    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+      new Path(paths.head).getFileSystem(new Configuration()))
+    assertEquals(Seq(files(2), files(3)).sortWith(_.toString < _.toString),
+      globbedPaths.sortWith(_.toString < _.toString))
+
+    paths = Seq(tempDir.getAbsolutePath + "/folder1/*", tempDir.getAbsolutePath + "/folder2/*")
+    globbedPaths = HudiSparkUtils.checkAndGlobPathIfNecessary(paths,
+      new Path(paths.head).getFileSystem(new Configuration()))
+    assertEquals(files.sortWith(_.toString < _.toString), globbedPaths.sortWith(_.toString < _.toString))
+  }
+
+  @Test
+  def testCreateInMemoryIndex(@TempDir tempDir: File): Unit = {
+    val spark = SparkSession.builder
+      .appName("Hoodie Datasource test")
+      .master("local[2]")
+      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .getOrCreate
+
+    val folders: Seq[Path] = Seq(
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2").toUri)
+    )
+
+    val files: Seq[Path] = Seq(
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file1").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder1", "file2").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file3").toUri),
+      new Path(Paths.get(tempDir.getAbsolutePath, "folder2", "file4").toUri)
+    )
+
+    folders.foreach(folder => new File(folder.toUri).mkdir())
+    files.foreach(file => new File(file.toUri).createNewFile())
+
+    val index = HudiSparkUtils.createInMemoryFileIndex(spark, Seq(folders(0), folders(1)))
+    val indexedFilePaths = index.allFiles().map(fs => fs.getPath)
+    assertEquals(files.sortWith(_.toString < _.toString), indexedFilePaths.sortWith(_.toString < _.toString))
+  }
+}