You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/29 00:37:15 UTC

[incubator-hudi] branch master updated: HUDI-135 - Skip Meta folder when looking for partitions

This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 93f8f12  HUDI-135 - Skip Meta folder when looking for partitions
93f8f12 is described below

commit 93f8f12a30a05cd8c6862d0f895e4bd9a9b3336a
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Tue May 28 12:54:23 2019 -0700

    HUDI-135 - Skip Meta folder when looking for partitions
---
 .../uber/hoodie/cli/commands/RepairsCommand.java   |  2 +-
 .../java/com/uber/hoodie/common/util/FSUtils.java  | 73 ++++++++++++++++------
 .../com/uber/hoodie/common/util/TestFSUtils.java   | 72 +++++++++++++++++++++
 3 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
index bf6c828..dda22ce 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
@@ -77,7 +77,7 @@ public class RepairsCommand implements CommandMarker {
 
     String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get()
         .getTimestamp();
-    List<String> partitionPaths = FSUtils.getAllFoldersThreeLevelsDown(HoodieCLI.fs,
+    List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs,
         HoodieCLI.tableMetadata.getBasePath());
     Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath());
     String[][] rows = new String[partitionPaths.size() + 1][];
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index a97cb51..ab61e5d 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -18,6 +18,7 @@ package com.uber.hoodie.common.util;
 
 import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.uber.hoodie.common.model.HoodieFileFormat;
 import com.uber.hoodie.common.model.HoodieLogFile;
@@ -67,6 +68,12 @@ public class FSUtils {
   private static final long MIN_ROLLBACK_TO_KEEP = 10;
   private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
 
+  private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() {
+    @Override
+    public boolean accept(Path file) {
+      return true;
+    }
+  };
 
   public static Configuration prepareHadoopConf(Configuration conf) {
     conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
@@ -152,16 +159,11 @@ public class FSUtils {
   /**
    * Gets all partition paths assuming date partitioning (year, month, day) three levels down.
    */
-  public static List<String> getAllFoldersThreeLevelsDown(FileSystem fs, String basePath)
+  public static List<String> getAllPartitionFoldersThreeLevelsDown(FileSystem fs, String basePath)
       throws IOException {
     List<String> datePartitions = new ArrayList<>();
     // Avoid listing and including any folders under the metafolder
-    PathFilter filter = (path) -> {
-      if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
-        return false;
-      }
-      return true;
-    };
+    PathFilter filter = getExcludeMetaPathFilter();
     FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*"), filter);
     for (FileStatus status : folders) {
       Path path = status.getPath();
@@ -201,31 +203,53 @@ public class FSUtils {
         partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
       }
       return true;
-    });
+    }, true);
     return partitions;
   }
 
   public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
       String markerDir) throws IOException {
     List<String> dataFiles = new LinkedList<>();
-    FSUtils.processFiles(fs, markerDir, (status) -> {
+    processFiles(fs, markerDir, (status) -> {
       String pathStr = status.getPath().toString();
       if (pathStr.endsWith(MARKER_EXTN)) {
         dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
       }
       return true;
-    });
+    }, false);
     return dataFiles;
   }
 
-  private static final void processFiles(FileSystem fs, String basePathStr,
-      Function<LocatedFileStatus, Boolean> consumer) throws IOException {
-    RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
-    while (allFiles.hasNext()) {
-      LocatedFileStatus status = allFiles.next();
-      boolean success = consumer.apply(status);
-      if (!success) {
-        throw new HoodieException("Failed to process file-status=" + status);
+  /**
+   * Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its
+   * subdirs are skipped
+   * @param fs           File System
+   * @param basePathStr  Base-Path
+   * @param consumer     Callback for processing
+   * @param excludeMetaFolder Exclude .hoodie folder
+   * @throws IOException
+   */
+  @VisibleForTesting
+  static void processFiles(FileSystem fs, String basePathStr,
+      Function<FileStatus, Boolean> consumer, boolean excludeMetaFolder) throws IOException {
+    PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
+    FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr));
+    for (int i = 0; i < topLevelStatuses.length; i++) {
+      FileStatus child = topLevelStatuses[i];
+      if (child.isFile()) {
+        boolean success = consumer.apply(child);
+        if (!success) {
+          throw new HoodieException("Failed to process file-status=" + child);
+        }
+      } else if (pathFilter.accept(child.getPath())) {
+        RemoteIterator<LocatedFileStatus> itr = fs.listFiles(child.getPath(), true);
+        while (itr.hasNext()) {
+          FileStatus status = itr.next();
+          boolean success = consumer.apply(status);
+          if (!success) {
+            throw new HoodieException("Failed to process file-status=" + status);
+          }
+        }
       }
     }
   }
@@ -234,7 +258,7 @@ public class FSUtils {
       boolean assumeDatePartitioning)
       throws IOException {
     if (assumeDatePartitioning) {
-      return getAllFoldersThreeLevelsDown(fs, basePathStr);
+      return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
     } else {
       return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
     }
@@ -247,6 +271,16 @@ public class FSUtils {
     return dotIndex == -1 ? "" : fileName.substring(dotIndex);
   }
 
+  private static PathFilter getExcludeMetaPathFilter() {
+    // Avoid listing and including any folders under the metafolder
+    return (path) -> {
+      if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
+        return false;
+      }
+      return true;
+    };
+  }
+
   public static String getInstantTime(String name) {
     return name.replace(getFileExtension(name), "");
   }
@@ -453,7 +487,6 @@ public class FSUtils {
       Thread.sleep(1000);
     }
     return recovered;
-
   }
 
   public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath,
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
index 5c3ba02..78fafd9 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
@@ -20,8 +20,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.exception.HoodieException;
+import java.io.IOException;
 import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
 import java.util.UUID;
 import java.util.regex.Pattern;
 import org.apache.hadoop.conf.Configuration;
@@ -30,6 +36,7 @@ import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.junit.rules.TemporaryFolder;
 
 public class TestFSUtils {
 
@@ -56,6 +63,71 @@ public class TestFSUtils {
   }
 
   @Test
+  /**
+   * Tests if process Files return only paths excluding marker directories
+   * Cleaner, Rollback and compaction-scheduling logic was recursively processing all subfolders including that
+   * of ".hoodie" when looking for partition-paths. This causes a race when they try to list all folders (recursively)
+   * but the marker directory (that of compaction inside of ".hoodie" folder) is deleted underneath by compactor.
+   * This code tests the fix by ensuring ".hoodie" and their subfolders are never processed.
+   */
+  public void testProcessFiles() throws Exception {
+    TemporaryFolder tmpFolder = new TemporaryFolder();
+    tmpFolder.create();
+    // All directories including marker dirs.
+    List<String> folders = Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15",
+        ".hoodie/.temp/2/2016/05/16");
+    HoodieTableMetaClient metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
+    String basePath = metaClient.getBasePath();
+    folders.stream().forEach(f -> {
+      try {
+        metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
+    // Files inside partitions and marker directories
+    List<String> files = Arrays.asList(
+        "2016/04/15/1_1-0-1_20190528120000.parquet",
+        "2016/05/16/2_1-0-1_20190528120000.parquet",
+        ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet",
+        ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet"
+    );
+
+    files.stream().forEach(f -> {
+      try {
+        metaClient.getFs().create(new Path(new Path(basePath), f));
+      } catch (IOException e) {
+        throw new HoodieException(e);
+      }
+    });
+
+    // Test excluding meta-folder
+    final List<String> collected = new ArrayList<>();
+    FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> {
+      collected.add(status.getPath().toString());
+      return true;
+    }, true);
+
+    Assert.assertTrue("Hoodie MetaFolder MUST be skipped but got :" + collected, collected.stream()
+        .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME)));
+    // Check if only files are listed
+    Assert.assertEquals(2, collected.size());
+
+    // Test including meta-folder
+    final List<String> collected2 = new ArrayList<>();
+    FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> {
+      collected2.add(status.getPath().toString());
+      return true;
+    }, false);
+
+    Assert.assertFalse("Hoodie MetaFolder will be present :" + collected2, collected2.stream()
+        .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME)));
+    // Check if only files are listed including hoodie.properties
+    Assert.assertEquals("Collected=" + collected2, 5, collected2.size());
+  }
+
+  @Test
   public void testGetCommitTime() {
     String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
     int taskPartitionId = 2;