You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by ab...@apache.org on 2018/03/21 08:30:27 UTC
[09/50] incubator-gobblin git commit: [GOBBLIN-395] Add lineage for
copying config based dataset
[GOBBLIN-395] Add lineage for copying config based dataset
Closes #2269 from zxcware/c2
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/161bef09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/161bef09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/161bef09
Branch: refs/heads/0.12.0
Commit: 161bef09dd5cbbbb65f9f6965008c57b632fb075
Parents: c35f76e
Author: zhchen <zh...@linkedin.com>
Authored: Tue Jan 30 17:26:03 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jan 30 17:26:03 2018 -0800
----------------------------------------------------------------------
.../data/management/copy/CopyableFile.java | 25 +++++++++
.../copy/RecursiveCopyableDataset.java | 21 +-------
.../copy/replication/ConfigBasedDataset.java | 11 ++--
.../data/management/copy/CopyableFileTest.java | 53 +++++++++++++++++++-
.../gobblin/metrics/reporter/EventReporter.java | 2 +-
5 files changed, 84 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 843a7e3..d2547b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy;
import org.apache.gobblin.data.management.partition.File;
import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.guid.Guid;
@@ -116,6 +117,30 @@ public class CopyableFile extends CopyEntity implements File {
}
/**
+ * Set file system based source and destination dataset for this {@link CopyableFile}
+ *
+ * @param originFs {@link FileSystem} where this {@link CopyableFile} origins
+ * @param targetFs {@link FileSystem} where this {@link CopyableFile} is copied to
+ */
+ public void setFsDatasets(FileSystem originFs, FileSystem targetFs) {
+ /*
+ * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
+ * if itself is not a folder
+ */
+ boolean isDir = origin.isDirectory();
+
+ Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath());
+ String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
+ sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+ sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString());
+
+ Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination);
+ String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString();
+ destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+ destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
+ }
+
+ /**
* Get a {@link CopyableFile.Builder}.
*
* @param originFs {@link FileSystem} where original file exists.
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 252dafa..2d1f740 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -20,9 +20,7 @@ package org.apache.gobblin.data.management.copy;
import org.apache.gobblin.commit.CommitStep;
import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
-import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.FileListUtils;
import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -148,24 +146,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
.datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
.resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
configuration)).build();
-
- /*
- * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
- * if itself is not a folder
- */
- boolean isDir = file.isDirectory();
-
- Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(file.getPath());
- String sourceDataset = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
- DatasetDescriptor source = new DatasetDescriptor(this.fs.getScheme(), sourceDataset);
- source.addMetadata(DatasetConstants.FS_URI, this.fs.getUri().toString());
- copyableFile.setSourceDataset(source);
-
- String destinationDataset = isDir ? thisTargetPath.toString() : thisTargetPath.getParent().toString();
- DatasetDescriptor destination = new DatasetDescriptor(targetFs.getScheme(), destinationDataset);
- destination.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
- copyableFile.setDestinationDataset(destination);
-
+ copyableFile.setFsDatasets(this.fs, targetFs);
copyableFiles.add(copyableFile);
}
copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 293034b..cc893a6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -187,12 +187,11 @@ public class ConfigBasedDataset implements CopyableDataset {
if (copyToFileMap.containsKey(newPath)) {
deletedPaths.add(newPath);
}
-
- copyableFiles.add(
- CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath),
- copyConfiguration)
- .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
- .build());
+ CopyableFile copyableFile = CopyableFile
+ .fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
+ .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build();
+ copyableFile.setFsDatasets(copyFromFs, copyToFs);
+ copyableFiles.add(copyableFile);
}
// clean up already checked paths
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index 30ba0af..986efeb 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.List;
import java.util.Properties;
@@ -36,8 +37,13 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.dataset.DatasetDescriptor;
import org.apache.gobblin.util.PathUtils;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
public class CopyableFileTest {
@Test
@@ -86,6 +92,51 @@ public class CopyableFileTest {
}
+ @Test
+ public void testSetFsDatasets() throws URISyntaxException {
+ FileSystem originFs = mock(FileSystem.class);
+ String originFsUri = "hdfs://source.company.biz:2000";
+ String originPath = "/data/databases/source/profile";
+ when(originFs.getUri()).thenReturn(new URI(originFsUri));
+ when(originFs.getScheme()).thenReturn("hdfs");
+
+ FileSystem targetFs = mock(FileSystem.class);
+ String targetFsUri = "file:///";
+ String destinationPath = "/data/databases/destination/profile";
+ when(targetFs.getUri()).thenReturn(new URI(targetFsUri));
+ when(targetFs.getScheme()).thenReturn("file");
+
+ // Test when source file is not a directory
+ FileStatus origin = new FileStatus(0l, false, 0, 0l, 0l, new Path(originPath));
+ CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
+ PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+ copyableFile.setFsDatasets(originFs, targetFs);
+ DatasetDescriptor source = copyableFile.getSourceDataset();
+ Assert.assertEquals(source.getName(), "/data/databases/source");
+ Assert.assertEquals(source.getPlatform(), "hdfs");
+ Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
+ DatasetDescriptor destination = copyableFile.getDestinationDataset();
+ Assert.assertEquals(destination.getName(), "/data/databases/destination");
+ Assert.assertEquals(destination.getPlatform(), "file");
+ Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
+
+ // Test when source file is a directory
+ originPath = originFsUri + originPath;
+ destinationPath = targetFsUri + destinationPath;
+ origin = new FileStatus(0l, true, 0, 0l, 0l, new Path(originPath));
+ copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
+ PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+ copyableFile.setFsDatasets(originFs, targetFs);
+ source = copyableFile.getSourceDataset();
+ Assert.assertEquals(source.getName(), "/data/databases/source/profile");
+ Assert.assertEquals(source.getPlatform(), "hdfs");
+ Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
+ destination = copyableFile.getDestinationDataset();
+ Assert.assertEquals(destination.getName(), "/data/databases/destination/profile");
+ Assert.assertEquals(destination.getPlatform(), "file");
+ Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
+ }
+
@Test
public void testCopyableFileBuilderMinimumConfiguration()
@@ -200,7 +251,7 @@ public class CopyableFileTest {
FileStatus fileStatus = new FileStatus(1, false, 0, 0, 0, 0, FsPermission.getDefault(), "owner", "group", path);
- FileSystem fs = Mockito.mock(FileSystem.class);
+ FileSystem fs = mock(FileSystem.class);
Mockito.doReturn(fileStatus).when(fs).getFileStatus(path);
Mockito.doReturn(path).when(fs).makeQualified(path);
Mockito.doReturn(new URI("hdfs://uri")).when(fs).getUri();
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 498ad58..a733d6a 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -125,7 +125,7 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
}
try {
if (!this.reportingQueue.offer(sanitizeEvent(event), 10, TimeUnit.SECONDS)) {
- log.error("Enqueuing of event %s at reporter with class %s timed out. Sending of events is probably stuck.",
+ log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.",
event, this.getClass().getCanonicalName());
}
} catch (InterruptedException ie) {