You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2018/01/31 01:26:01 UTC

incubator-gobblin git commit: [GOBBLIN-395] Add lineage for copying config based dataset

Repository: incubator-gobblin
Updated Branches:
  refs/heads/master c35f76e4e -> 161bef09d


[GOBBLIN-395] Add lineage for copying config based dataset

Closes #2269 from zxcware/c2


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/161bef09
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/161bef09
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/161bef09

Branch: refs/heads/master
Commit: 161bef09dd5cbbbb65f9f6965008c57b632fb075
Parents: c35f76e
Author: zhchen <zh...@linkedin.com>
Authored: Tue Jan 30 17:26:03 2018 -0800
Committer: Hung Tran <hu...@linkedin.com>
Committed: Tue Jan 30 17:26:03 2018 -0800

----------------------------------------------------------------------
 .../data/management/copy/CopyableFile.java      | 25 +++++++++
 .../copy/RecursiveCopyableDataset.java          | 21 +-------
 .../copy/replication/ConfigBasedDataset.java    | 11 ++--
 .../data/management/copy/CopyableFileTest.java  | 53 +++++++++++++++++++-
 .../gobblin/metrics/reporter/EventReporter.java |  2 +-
 5 files changed, 84 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
index 843a7e3..d2547b4 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopyableFile.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.copy;
 
 import org.apache.gobblin.data.management.partition.File;
 import org.apache.gobblin.data.management.copy.PreserveAttributes.Option;
+import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.guid.Guid;
@@ -116,6 +117,30 @@ public class CopyableFile extends CopyEntity implements File {
   }
 
   /**
+   * Set file system based source and destination dataset for this {@link CopyableFile}
+   *
+   * @param originFs {@link FileSystem} where this {@link CopyableFile} origins
+   * @param targetFs {@link FileSystem} where this {@link CopyableFile} is copied to
+   */
+  public void setFsDatasets(FileSystem originFs, FileSystem targetFs) {
+    /*
+     * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
+     * if itself is not a folder
+     */
+    boolean isDir = origin.isDirectory();
+
+    Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(origin.getPath());
+    String sourceDatasetName = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
+    sourceDataset = new DatasetDescriptor(originFs.getScheme(), sourceDatasetName);
+    sourceDataset.addMetadata(DatasetConstants.FS_URI, originFs.getUri().toString());
+
+    Path fullDestinationPath = Path.getPathWithoutSchemeAndAuthority(destination);
+    String destinationDatasetName = isDir ? fullDestinationPath.toString() : fullDestinationPath.getParent().toString();
+    destinationDataset = new DatasetDescriptor(targetFs.getScheme(), destinationDatasetName);
+    destinationDataset.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
+  }
+
+  /**
    * Get a {@link CopyableFile.Builder}.
    *
    * @param originFs {@link FileSystem} where original file exists.

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
index 252dafa..2d1f740 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/RecursiveCopyableDataset.java
@@ -20,9 +20,7 @@ package org.apache.gobblin.data.management.copy;
 import org.apache.gobblin.commit.CommitStep;
 import org.apache.gobblin.data.management.copy.entities.PrePublishStep;
 import org.apache.gobblin.data.management.dataset.DatasetUtils;
-import org.apache.gobblin.dataset.DatasetConstants;
 import org.apache.gobblin.dataset.FileSystemDataset;
-import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
 import org.apache.gobblin.util.FileListUtils;
 import org.apache.gobblin.util.commit.DeleteFileCommitStep;
@@ -148,24 +146,7 @@ public class RecursiveCopyableDataset implements CopyableDataset, FileSystemData
               .datasetOutputPath(thisTargetPath.toString()).ancestorsOwnerAndPermission(CopyableFile
               .resolveReplicatedOwnerAndPermissionsRecursively(this.fs, file.getPath().getParent(), nonGlobSearchPath,
                   configuration)).build();
-
-      /*
-       * By default, the raw Gobblin dataset for CopyableFile lineage is its parent folder
-       * if itself is not a folder
-       */
-      boolean isDir = file.isDirectory();
-
-      Path fullSourcePath = Path.getPathWithoutSchemeAndAuthority(file.getPath());
-      String sourceDataset = isDir ? fullSourcePath.toString() : fullSourcePath.getParent().toString();
-      DatasetDescriptor source = new DatasetDescriptor(this.fs.getScheme(), sourceDataset);
-      source.addMetadata(DatasetConstants.FS_URI, this.fs.getUri().toString());
-      copyableFile.setSourceDataset(source);
-
-      String destinationDataset = isDir ? thisTargetPath.toString() : thisTargetPath.getParent().toString();
-      DatasetDescriptor destination = new DatasetDescriptor(targetFs.getScheme(), destinationDataset);
-      destination.addMetadata(DatasetConstants.FS_URI, targetFs.getUri().toString());
-      copyableFile.setDestinationDataset(destination);
-
+      copyableFile.setFsDatasets(this.fs, targetFs);
       copyableFiles.add(copyableFile);
     }
     copyEntities.addAll(this.copyableFileFilter.filter(this.fs, targetFs, copyableFiles));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
index 293034b..cc893a6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/replication/ConfigBasedDataset.java
@@ -187,12 +187,11 @@ public class ConfigBasedDataset implements CopyableDataset {
         if (copyToFileMap.containsKey(newPath)) {
           deletedPaths.add(newPath);
         }
-
-        copyableFiles.add(
-            CopyableFile.fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath),
-                copyConfiguration)
-                .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString())
-                .build());
+        CopyableFile copyableFile = CopyableFile
+            .fromOriginAndDestination(copyFromFs, originFileStatus, copyToFs.makeQualified(newPath), copyConfiguration)
+            .fileSet(PathUtils.getPathWithoutSchemeAndAuthority(copyTo.getDatasetPath()).toString()).build();
+        copyableFile.setFsDatasets(copyFromFs, copyToFs);
+        copyableFiles.add(copyableFile);
       }
 
       // clean up already checked paths

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
index 30ba0af..986efeb 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileTest.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.List;
 import java.util.Properties;
 
@@ -36,8 +37,13 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.dataset.DatasetDescriptor;
 import org.apache.gobblin.util.PathUtils;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
 public class CopyableFileTest {
 
   @Test
@@ -86,6 +92,51 @@ public class CopyableFileTest {
 
   }
 
+  @Test
+  public void testSetFsDatasets() throws URISyntaxException {
+    FileSystem originFs = mock(FileSystem.class);
+    String originFsUri = "hdfs://source.company.biz:2000";
+    String originPath = "/data/databases/source/profile";
+    when(originFs.getUri()).thenReturn(new URI(originFsUri));
+    when(originFs.getScheme()).thenReturn("hdfs");
+
+    FileSystem targetFs = mock(FileSystem.class);
+    String targetFsUri = "file:///";
+    String destinationPath = "/data/databases/destination/profile";
+    when(targetFs.getUri()).thenReturn(new URI(targetFsUri));
+    when(targetFs.getScheme()).thenReturn("file");
+
+    // Test when source file is not a directory
+    FileStatus origin = new FileStatus(0l, false, 0, 0l, 0l, new Path(originPath));
+    CopyableFile copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+    copyableFile.setFsDatasets(originFs, targetFs);
+    DatasetDescriptor source = copyableFile.getSourceDataset();
+    Assert.assertEquals(source.getName(), "/data/databases/source");
+    Assert.assertEquals(source.getPlatform(), "hdfs");
+    Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
+    DatasetDescriptor destination = copyableFile.getDestinationDataset();
+    Assert.assertEquals(destination.getName(), "/data/databases/destination");
+    Assert.assertEquals(destination.getPlatform(), "file");
+    Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
+
+    // Test when source file is a directory
+    originPath = originFsUri + originPath;
+    destinationPath = targetFsUri + destinationPath;
+    origin = new FileStatus(0l, true, 0, 0l, 0l, new Path(originPath));
+    copyableFile = new CopyableFile(origin, new Path(destinationPath), null, null, null,
+        PreserveAttributes.fromMnemonicString(""), "", 0, 0, Maps.<String, String>newHashMap(), "");
+    copyableFile.setFsDatasets(originFs, targetFs);
+    source = copyableFile.getSourceDataset();
+    Assert.assertEquals(source.getName(), "/data/databases/source/profile");
+    Assert.assertEquals(source.getPlatform(), "hdfs");
+    Assert.assertEquals(source.getMetadata().get("fsUri"), originFsUri);
+    destination = copyableFile.getDestinationDataset();
+    Assert.assertEquals(destination.getName(), "/data/databases/destination/profile");
+    Assert.assertEquals(destination.getPlatform(), "file");
+    Assert.assertEquals(destination.getMetadata().get("fsUri"), targetFsUri);
+  }
+
 
   @Test
   public void testCopyableFileBuilderMinimumConfiguration()
@@ -200,7 +251,7 @@ public class CopyableFileTest {
 
     FileStatus fileStatus = new FileStatus(1, false, 0, 0, 0, 0, FsPermission.getDefault(), "owner", "group", path);
 
-    FileSystem fs = Mockito.mock(FileSystem.class);
+    FileSystem fs = mock(FileSystem.class);
     Mockito.doReturn(fileStatus).when(fs).getFileStatus(path);
     Mockito.doReturn(path).when(fs).makeQualified(path);
     Mockito.doReturn(new URI("hdfs://uri")).when(fs).getUri();

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/161bef09/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
----------------------------------------------------------------------
diff --git a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
index 498ad58..a733d6a 100644
--- a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
+++ b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java
@@ -125,7 +125,7 @@ public abstract class EventReporter extends ScheduledReporter implements Closeab
     }
     try {
       if (!this.reportingQueue.offer(sanitizeEvent(event), 10, TimeUnit.SECONDS)) {
-        log.error("Enqueuing of event %s at reporter with class %s timed out. Sending of events is probably stuck.",
+        log.error("Enqueuing of event {} at reporter with class {} timed out. Sending of events is probably stuck.",
             event, this.getClass().getCanonicalName());
       }
     } catch (InterruptedException ie) {