You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2019/05/29 00:37:15 UTC
[incubator-hudi] branch master updated: HUDI-135 - Skip Meta folder
when looking for partitions
This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 93f8f12 HUDI-135 - Skip Meta folder when looking for partitions
93f8f12 is described below
commit 93f8f12a30a05cd8c6862d0f895e4bd9a9b3336a
Author: Balaji Varadarajan <va...@uber.com>
AuthorDate: Tue May 28 12:54:23 2019 -0700
HUDI-135 - Skip Meta folder when looking for partitions
---
.../uber/hoodie/cli/commands/RepairsCommand.java | 2 +-
.../java/com/uber/hoodie/common/util/FSUtils.java | 73 ++++++++++++++++------
.../com/uber/hoodie/common/util/TestFSUtils.java | 72 +++++++++++++++++++++
3 files changed, 126 insertions(+), 21 deletions(-)
diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
index bf6c828..dda22ce 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/RepairsCommand.java
@@ -77,7 +77,7 @@ public class RepairsCommand implements CommandMarker {
String latestCommit = HoodieCLI.tableMetadata.getActiveTimeline().getCommitTimeline().lastInstant().get()
.getTimestamp();
- List<String> partitionPaths = FSUtils.getAllFoldersThreeLevelsDown(HoodieCLI.fs,
+ List<String> partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(HoodieCLI.fs,
HoodieCLI.tableMetadata.getBasePath());
Path basePath = new Path(HoodieCLI.tableMetadata.getBasePath());
String[][] rows = new String[partitionPaths.size() + 1][];
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
index a97cb51..ab61e5d 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FSUtils.java
@@ -18,6 +18,7 @@ package com.uber.hoodie.common.util;
import static com.uber.hoodie.common.table.HoodieTableMetaClient.MARKER_EXTN;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.uber.hoodie.common.model.HoodieFileFormat;
import com.uber.hoodie.common.model.HoodieLogFile;
@@ -67,6 +68,12 @@ public class FSUtils {
private static final long MIN_ROLLBACK_TO_KEEP = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";
+ private static final PathFilter ALLOW_ALL_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path file) {
+ return true;
+ }
+ };
public static Configuration prepareHadoopConf(Configuration conf) {
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
@@ -152,16 +159,11 @@ public class FSUtils {
/**
* Gets all partition paths assuming date partitioning (year, month, day) three levels down.
*/
- public static List<String> getAllFoldersThreeLevelsDown(FileSystem fs, String basePath)
+ public static List<String> getAllPartitionFoldersThreeLevelsDown(FileSystem fs, String basePath)
throws IOException {
List<String> datePartitions = new ArrayList<>();
// Avoid listing and including any folders under the metafolder
- PathFilter filter = (path) -> {
- if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
- return false;
- }
- return true;
- };
+ PathFilter filter = getExcludeMetaPathFilter();
FileStatus[] folders = fs.globStatus(new Path(basePath + "/*/*/*"), filter);
for (FileStatus status : folders) {
Path path = status.getPath();
@@ -201,31 +203,53 @@ public class FSUtils {
partitions.add(getRelativePartitionPath(basePath, filePath.getParent()));
}
return true;
- });
+ }, true);
return partitions;
}
public static final List<String> getAllDataFilesForMarkers(FileSystem fs, String basePath, String instantTs,
String markerDir) throws IOException {
List<String> dataFiles = new LinkedList<>();
- FSUtils.processFiles(fs, markerDir, (status) -> {
+ processFiles(fs, markerDir, (status) -> {
String pathStr = status.getPath().toString();
if (pathStr.endsWith(MARKER_EXTN)) {
dataFiles.add(FSUtils.translateMarkerToDataPath(basePath, pathStr, instantTs));
}
return true;
- });
+ }, false);
return dataFiles;
}
- private static final void processFiles(FileSystem fs, String basePathStr,
- Function<LocatedFileStatus, Boolean> consumer) throws IOException {
- RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(new Path(basePathStr), true);
- while (allFiles.hasNext()) {
- LocatedFileStatus status = allFiles.next();
- boolean success = consumer.apply(status);
- if (!success) {
- throw new HoodieException("Failed to process file-status=" + status);
+ /**
+ * Recursively processes all files in the base-path. If excludeMetaFolder is set, the meta-folder and all its
+ * subdirs are skipped
+ * @param fs File System
+ * @param basePathStr Base-Path
+ * @param consumer Callback for processing
+ * @param excludeMetaFolder Exclude .hoodie folder
+ * @throws IOException
+ */
+ @VisibleForTesting
+ static void processFiles(FileSystem fs, String basePathStr,
+ Function<FileStatus, Boolean> consumer, boolean excludeMetaFolder) throws IOException {
+ PathFilter pathFilter = excludeMetaFolder ? getExcludeMetaPathFilter() : ALLOW_ALL_FILTER;
+ FileStatus[] topLevelStatuses = fs.listStatus(new Path(basePathStr));
+ for (int i = 0; i < topLevelStatuses.length; i++) {
+ FileStatus child = topLevelStatuses[i];
+ if (child.isFile()) {
+ boolean success = consumer.apply(child);
+ if (!success) {
+ throw new HoodieException("Failed to process file-status=" + child);
+ }
+ } else if (pathFilter.accept(child.getPath())) {
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(child.getPath(), true);
+ while (itr.hasNext()) {
+ FileStatus status = itr.next();
+ boolean success = consumer.apply(status);
+ if (!success) {
+ throw new HoodieException("Failed to process file-status=" + status);
+ }
+ }
}
}
}
@@ -234,7 +258,7 @@ public class FSUtils {
boolean assumeDatePartitioning)
throws IOException {
if (assumeDatePartitioning) {
- return getAllFoldersThreeLevelsDown(fs, basePathStr);
+ return getAllPartitionFoldersThreeLevelsDown(fs, basePathStr);
} else {
return getAllFoldersWithPartitionMetaFile(fs, basePathStr);
}
@@ -247,6 +271,16 @@ public class FSUtils {
return dotIndex == -1 ? "" : fileName.substring(dotIndex);
}
+ private static PathFilter getExcludeMetaPathFilter() {
+ // Avoid listing and including any folders under the metafolder
+ return (path) -> {
+ if (path.toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) {
+ return false;
+ }
+ return true;
+ };
+ }
+
public static String getInstantTime(String name) {
return name.replace(getFileExtension(name), "");
}
@@ -453,7 +487,6 @@ public class FSUtils {
Thread.sleep(1000);
}
return recovered;
-
}
public static void deleteOlderCleanMetaFiles(FileSystem fs, String metaPath,
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
index 5c3ba02..78fafd9 100644
--- a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestFSUtils.java
@@ -20,8 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.uber.hoodie.common.model.HoodieTestUtils;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.exception.HoodieException;
+import java.io.IOException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
@@ -30,6 +36,7 @@ import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import org.junit.rules.TemporaryFolder;
public class TestFSUtils {
@@ -56,6 +63,71 @@ public class TestFSUtils {
}
@Test
+ /**
+ * Tests if process Files return only paths excluding marker directories
+ * Cleaner, Rollback and compaction-scheduling logic was recursively processing all subfolders including that
+ * of ".hoodie" when looking for partition-paths. This causes a race when they try to list all folders (recursively)
+ * but the marker directory (that of compaction inside of ".hoodie" folder) is deleted underneath by compactor.
+ * This code tests the fix by ensuring ".hoodie" and their subfolders are never processed.
+ */
+ public void testProcessFiles() throws Exception {
+ TemporaryFolder tmpFolder = new TemporaryFolder();
+ tmpFolder.create();
+ // All directories including marker dirs.
+ List<String> folders = Arrays.asList("2016/04/15", "2016/05/16", ".hoodie/.temp/2/2016/04/15",
+ ".hoodie/.temp/2/2016/05/16");
+ HoodieTableMetaClient metaClient = HoodieTestUtils.init(tmpFolder.getRoot().getAbsolutePath());
+ String basePath = metaClient.getBasePath();
+ folders.stream().forEach(f -> {
+ try {
+ metaClient.getFs().mkdirs(new Path(new Path(basePath), f));
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ });
+
+ // Files inside partitions and marker directories
+ List<String> files = Arrays.asList(
+ "2016/04/15/1_1-0-1_20190528120000.parquet",
+ "2016/05/16/2_1-0-1_20190528120000.parquet",
+ ".hoodie/.temp/2/2016/05/16/2_1-0-1_20190528120000.parquet",
+ ".hoodie/.temp/2/2016/04/15/1_1-0-1_20190528120000.parquet"
+ );
+
+ files.stream().forEach(f -> {
+ try {
+ metaClient.getFs().create(new Path(new Path(basePath), f));
+ } catch (IOException e) {
+ throw new HoodieException(e);
+ }
+ });
+
+ // Test excluding meta-folder
+ final List<String> collected = new ArrayList<>();
+ FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> {
+ collected.add(status.getPath().toString());
+ return true;
+ }, true);
+
+ Assert.assertTrue("Hoodie MetaFolder MUST be skipped but got :" + collected, collected.stream()
+ .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME)));
+ // Check if only files are listed
+ Assert.assertEquals(2, collected.size());
+
+ // Test including meta-folder
+ final List<String> collected2 = new ArrayList<>();
+ FSUtils.processFiles(metaClient.getFs(), basePath, (status) -> {
+ collected2.add(status.getPath().toString());
+ return true;
+ }, false);
+
+ Assert.assertFalse("Hoodie MetaFolder will be present :" + collected2, collected2.stream()
+ .noneMatch(s -> s.contains(HoodieTableMetaClient.METAFOLDER_NAME)));
+ // Check if only files are listed including hoodie.properties
+ Assert.assertEquals("Collected=" + collected2, 5, collected2.size());
+ }
+
+ @Test
public void testGetCommitTime() {
String commitTime = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
int taskPartitionId = 2;