You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ib...@apache.org on 2017/08/09 18:04:22 UTC
incubator-gobblin git commit: [GOBBLIN-183] Gobblin data management
copy empty directories
Repository: incubator-gobblin
Updated Branches:
refs/heads/master 3ab02b3f2 -> 42677dc8c
[GOBBLIN-183] Gobblin data management copy empty directories
Closes #2036 from zxcware/empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/42677dc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/42677dc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/42677dc8
Branch: refs/heads/master
Commit: 42677dc8ccd32262006f7d77c4d40c7735ed71e5
Parents: 3ab02b3
Author: zhchen <zh...@linkedin.com>
Authored: Wed Aug 9 11:04:03 2017 -0700
Committer: Issac Buenrostro <ib...@apache.org>
Committed: Wed Aug 9 11:04:03 2017 -0700
----------------------------------------------------------------------
.../data/management/copy/CopyConfiguration.java | 5 ++
.../data/management/copy/CopyableFile.java | 2 +-
.../copy/RecursiveCopyableDataset.java | 9 +++-
.../management/copy/RecursivePathFinder.java | 9 +++-
.../FileAwareInputStreamExtractor.java | 4 ++
.../writer/FileAwareInputStreamDataWriter.java | 5 ++
.../org/apache/gobblin/util/FileListUtils.java | 50 +++++++++++++++++---
.../gobblin/util/io/EmptyInputStream.java | 36 ++++++++++++++
.../apache/gobblin/util/FileListUtilsTest.java | 47 ++++++++++++++++++
9 files changed, 156 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
index b8f0365..82cca49 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyConfiguration.java
@@ -51,6 +51,11 @@ public class CopyConfiguration {
public static final String DESTINATION_GROUP_KEY = COPY_PREFIX + ".dataset.destination.group";
public static final String PRIORITIZATION_PREFIX = COPY_PREFIX + ".prioritization";
+ /**
+ * Include empty directories in the source for copy
+ */
+ public static final String INCLUDE_EMPTY_DIRECTORIES = COPY_PREFIX + ".includeEmptyDirectories";
+
public static final String PRIORITIZER_ALIAS_KEY = PRIORITIZATION_PREFIX + ".prioritizerAlias";
public static final String MAX_COPY_PREFIX = PRIORITIZATION_PREFIX + ".maxCopy";
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index f2cb933..9c729e3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -203,7 +203,7 @@ public class CopyableFile extends CopyEntity implements File {
this.configuration.getTargetFs(), this.destination);
}
if (this.checksum == null) {
- FileChecksum checksumTmp = this.originFs.getFileChecksum(this.origin.getPath());
+ FileChecksum checksumTmp = this.origin.isDirectory() ? null : this.originFs.getFileChecksum(this.origin.getPath());
this.checksum = checksumTmp == null ? new byte[0] : checksumTmp.getBytes();
}
if (this.fileSet == null) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index e24752a..34428f1 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -65,7 +65,12 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
private final CopyableFileFilter copyableFileFilter;
private final boolean update;
private final boolean delete;
+
+ // Include empty directories in the source for copy
+ private final boolean includeEmptyDirectories;
+ // Delete empty directories in the destination
private final boolean deleteEmptyDirectories;
+
private final Properties properties;
public RecursiveCopyableDataset(final FileSystem fs, Path rootPath, Properties properties, Path glob) {
@@ -80,6 +85,8 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
this.update = Boolean.parseBoolean(properties.getProperty(UPDATE_KEY));
this.delete = Boolean.parseBoolean(properties.getProperty(DELETE_KEY));
this.deleteEmptyDirectories = Boolean.parseBoolean(properties.getProperty(DELETE_EMPTY_DIRECTORIES_KEY));
+ this.includeEmptyDirectories =
+ Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
this.properties = properties;
}
@@ -150,7 +157,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
@VisibleForTesting
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
try {
- return FileListUtils.listFilesRecursively(fs, path, fileFilter);
+ return FileListUtils.listFilesToCopyAtPath(fs, path, fileFilter, includeEmptyDirectories);
} catch (FileNotFoundException fnfe) {
return Lists.newArrayList();
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
index ec608b2..b749996 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursivePathFinder.java
@@ -46,22 +46,27 @@ public class RecursivePathFinder {
private final Path rootPath;
private final FileSystem fs;
private final PathFilter pathFilter;
+ private final boolean includeEmptyDirectories;
public RecursivePathFinder(final FileSystem fs, Path rootPath, Properties properties) {
this.rootPath = PathUtils.getPathWithoutSchemeAndAuthority(rootPath);
this.fs = fs;
this.pathFilter = DatasetUtils.instantiatePathFilter(properties);
+ this.includeEmptyDirectories =
+ Boolean.parseBoolean(properties.getProperty(CopyConfiguration.INCLUDE_EMPTY_DIRECTORIES));
}
- public Set<FileStatus> getPaths(boolean skipHiddenPaths) throws IOException {
+ public Set<FileStatus> getPaths(boolean skipHiddenPaths)
+ throws IOException {
if (!this.fs.exists(this.rootPath)) {
return Sets.newHashSet();
}
PathFilter actualFilter =
skipHiddenPaths ? new AndPathFilter(new HiddenFilter(), this.pathFilter) : this.pathFilter;
- List<FileStatus> files = FileListUtils.listFilesRecursively(this.fs, this.rootPath, actualFilter);
+ List<FileStatus> files =
+ FileListUtils.listFilesToCopyAtPath(this.fs, this.rootPath, actualFilter, includeEmptyDirectories);
return Sets.newHashSet(files);
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
index 5e53344..406c828 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/extractor/FileAwareInputStreamExtractor.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.data.management.copy.FileAwareInputStream;
import org.apache.gobblin.source.extractor.DataRecordException;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.io.EmptyInputStream;
import org.apache.gobblin.util.io.MeteredInputStream;
import java.io.IOException;
@@ -80,6 +81,9 @@ public class FileAwareInputStreamExtractor implements Extractor<String, FileAwar
this.state == null ? HadoopUtils.newConfiguration() : HadoopUtils.getConfFromState(this.state);
FileSystem fsFromFile = this.file.getOrigin().getPath().getFileSystem(conf);
this.recordRead = true;
+ if (this.file.getFileStatus().isDirectory()) {
+ return new FileAwareInputStream(this.file, EmptyInputStream.instance);
+ }
return new FileAwareInputStream(this.file,
MeteredInputStream.builder().in(fsFromFile.open(this.file.getFileStatus().getPath())).build());
}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 3ce204d..905ac02 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -206,6 +206,11 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
log.info(String.format("Recovering persisted file %s to %s.", persistedFile.get().getPath(), writeAt));
this.fs.rename(persistedFile.get().getPath(), writeAt);
} else {
+ // Copy empty directories
+ if (copyableFile.getFileStatus().isDirectory()) {
+ this.fs.mkdirs(writeAt);
+ return;
+ }
OutputStream os =
this.fs.create(writeAt, true, this.fs.getConf().getInt("io.file.buffer.size", 4096), replication, blockSize);
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
index 9472db6..02920c2 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/FileListUtils.java
@@ -69,12 +69,35 @@ public class FileListUtils {
}
/**
+ * Given a path to copy, list all files rooted at the given path to copy
+ *
+ * @param fs the file system of the path
+ * @param path root path to copy
+ * @param fileFilter a filter only applied to root
+ * @param includeEmptyDirectories a control to include empty directories for copy
+ */
+ public static List<FileStatus> listFilesToCopyAtPath(FileSystem fs, Path path, PathFilter fileFilter,
+ boolean includeEmptyDirectories)
+ throws IOException {
+ List<FileStatus> files = Lists.newArrayList();
+ FileStatus rootFile = fs.getFileStatus(path);
+
+ listFilesRecursivelyHelper(fs, files, rootFile, fileFilter, false, includeEmptyDirectories);
+
+ // Copy the empty root directory
+ if (files.size() == 0 && rootFile.isDirectory() && includeEmptyDirectories) {
+ files.add(rootFile);
+ }
+ return files;
+ }
+
+ /**
* Helper method to list out all files under a specified path. The specified {@link PathFilter} is treated as a file
* filter, that is it is only applied to file {@link Path}s.
*/
public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {
- return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter, false);
+ return listFilesRecursively(fs, path, fileFilter, false);
}
/**
@@ -83,20 +106,33 @@ public class FileListUtils {
*/
public static List<FileStatus> listFilesRecursively(FileSystem fs, Path path, PathFilter fileFilter,
boolean applyFilterToDirectories) throws IOException {
- return listFilesRecursivelyHelper(fs, Lists.<FileStatus> newArrayList(), fs.getFileStatus(path), fileFilter,
- applyFilterToDirectories);
+ return listFilesRecursivelyHelper(fs, Lists.newArrayList(), fs.getFileStatus(path), fileFilter,
+ applyFilterToDirectories, false);
}
private static List<FileStatus> listFilesRecursivelyHelper(FileSystem fs, List<FileStatus> files,
- FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories)
+ FileStatus fileStatus, PathFilter fileFilter, boolean applyFilterToDirectories, boolean includeEmptyDirectories)
throws FileNotFoundException, IOException {
if (fileStatus.isDirectory()) {
for (FileStatus status : fs.listStatus(fileStatus.getPath(),
applyFilterToDirectories ? fileFilter : NO_OP_PATH_FILTER)) {
- if (fileStatus.isDirectory()) {
- listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories);
+ if (status.isDirectory()) {
+ // Number of files collected before diving into the directory
+ int numFilesBefore = files.size();
+
+ listFilesRecursivelyHelper(fs, files, status, fileFilter, applyFilterToDirectories, includeEmptyDirectories);
+
+ // Number of files collected after diving into the directory
+ int numFilesAfter = files.size();
+ if (numFilesAfter == numFilesBefore && includeEmptyDirectories) {
+ /*
+ * This is effectively an empty directory, which needs explicit copying. Has there any data file
+ * in the directory, the directory would be created as a side-effect of copying the data file
+ */
+ files.add(status);
+ }
} else {
- files.add(fileStatus);
+ files.add(status);
}
}
} else if (fileFilter.accept(fileStatus.getPath())) {
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
new file mode 100644
index 0000000..3516866
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/io/EmptyInputStream.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+
+/**
+ * An {@link InputStream} with empty content
+ */
+public class EmptyInputStream extends InputStream {
+ public static final InputStream instance = new EmptyInputStream();
+
+ private EmptyInputStream() {}
+ @Override
+ public int read()
+ throws IOException {
+ return 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/42677dc8/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
index 415ae54..388e311 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/FileListUtilsTest.java
@@ -146,4 +146,51 @@ public class FileListUtilsTest {
}
}
+ public void testListFilesToCopyAtPath() throws IOException {
+ FileSystem localFs = FileSystem.getLocal(new Configuration());
+ Path baseDir = new Path(FILE_UTILS_TEST_DIR, "fileListTestDir4");
+ try {
+ if (localFs.exists(baseDir)) {
+ localFs.delete(baseDir, true);
+ }
+ localFs.mkdirs(baseDir);
+
+ // Empty root directory
+ List<FileStatus> testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+ Assert.assertEquals(testFiles.size(), 1);
+ Assert.assertEquals(testFiles.get(0).getPath().getName(), baseDir.getName());
+
+ // With an empty sub directory
+ Path subDir = new Path(baseDir, "subDir");
+ localFs.mkdirs(subDir);
+ testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+ Assert.assertEquals(testFiles.size(), 1);
+ Assert.assertEquals(testFiles.get(0).getPath().getName(), subDir.getName());
+
+ // Disable include empty directories
+ testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, false);
+ Assert.assertEquals(testFiles.size(), 0);
+
+ // With file subDir/tes1
+ Path test1Path = new Path(subDir, TEST_FILE_NAME1);
+ localFs.create(test1Path);
+ testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+ Assert.assertEquals(testFiles.size(), 1);
+ Assert.assertEquals(testFiles.get(0).getPath().getName(), test1Path.getName());
+
+ // With file subDir/test2
+ Path test2Path = new Path(subDir, TEST_FILE_NAME2);
+ localFs.create(test2Path);
+ testFiles = FileListUtils.listFilesToCopyAtPath(localFs, baseDir, FileListUtils.NO_OP_PATH_FILTER, true);
+ Assert.assertEquals(testFiles.size(), 2);
+ Set<String> fileNames = Sets.newHashSet();
+ for (FileStatus testFileStatus : testFiles) {
+ fileNames.add(testFileStatus.getPath().getName());
+ }
+ Assert.assertTrue(fileNames.contains(TEST_FILE_NAME1) && fileNames.contains(TEST_FILE_NAME2));
+ } finally {
+ localFs.delete(baseDir, true);
+ }
+ }
+
}