You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/07/01 22:01:13 UTC

[GitHub] [incubator-gobblin] aryavaibhav93 opened a new pull request #3054: Making dataset name as part of staging directory in gobblin-distcp (f…

aryavaibhav93 opened a new pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054


   …ile-based)
   
   Dear Gobblin maintainers,
   
   Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!
   
   
   ### JIRA
   Making dataset name as part of staging directory in gobblin-distcp (file-based)
   
   
   ### Description
   Currently, Embedded distcp creates it's own temp or user defined staging directory. But, this PR adds the option to have a staging directory which is being created within datasets.
   
   
   ### Tests
   My PR incorporates change to the following unit tests:
   FileAwareInputStreamDataWriterTest.java
   
   
   ### Commits
   - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)":
       1. Subject is separated from body by a blank line
       2. Subject is limited to 50 characters
       3. Subject does not end with a period
       4. Subject uses the imperative mood ("add", not "adding")
       5. Body wraps at 72 characters
       6. Body explains "what" and "why", not "how"
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] aplex commented on a change in pull request #3054: Making dataset name as part of staging directory in gobblin-distcp (f…

Posted by GitBox <gi...@apache.org>.
aplex commented on a change in pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r449186127



##########
File path: gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
##########
@@ -78,49 +82,101 @@ public void setup() throws Exception {
   public void testWrite() throws Exception {
     String streamString1 = "testContents1";
     String streamString2 = "testContents2";
+
     String userDefStagingDir = System.getProperty("user.dir") + "/user_staging_dir";
+
+    testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+        getClass().getSimpleName());
+    localFs = FileSystem.getLocal(new Configuration());
+
+    Path ds1 = createDatasetPath("db1");
+    String dataSetDefinedStagingDir = testRootDir + "/db1/staging";
+
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
+
     WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString());
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state, cf);
     CopySource.serializeCopyableDataset(state, metadata);
+
     FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0);
+
     FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build();
+
     Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString1);
 
     //testing user defined staging directory
     WorkUnitState state2 = TestUtils.createTestWorkUnitState();
+    state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true);
     state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString());
     state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output2").toString());
     state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir);
     state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state2, cf);
     CopySource.serializeCopyableDataset(state2, metadata);
+
     dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0);
+
     fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build();
+
     Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     writtenFilePath = new Path(new Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString2);
+
+    //testing dataset defined staging directory

Review comment:
       Can we split this large test into several smaller unit tests that are focusing on specific use case? Then it will be easier to understand what are we testing and expecting.  The common logic can be moved to a separate methods that we call explicitly. TestNG can also call them before each test, if we put @BeforeTest/@BeforeClass annotation on them.

##########
File path: gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
##########
@@ -78,49 +82,101 @@ public void setup() throws Exception {
   public void testWrite() throws Exception {
     String streamString1 = "testContents1";
     String streamString2 = "testContents2";
+
     String userDefStagingDir = System.getProperty("user.dir") + "/user_staging_dir";
+
+    testRootDir = new Path(Paths.get("").toAbsolutePath().toString(),
+        getClass().getSimpleName());
+    localFs = FileSystem.getLocal(new Configuration());
+
+    Path ds1 = createDatasetPath("db1");
+    String dataSetDefinedStagingDir = testRootDir + "/db1/staging";
+
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
     CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
+
     WorkUnitState state = TestUtils.createTestWorkUnitState();
+    state.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
     state.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString());
     state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output").toString());
     state.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state, cf);
     CopySource.serializeCopyableDataset(state, metadata);
+
     FileAwareInputStreamDataWriter dataWriter = new FileAwareInputStreamDataWriter(state, 1, 0);
+
     FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString1))).build();
+
     Assert.assertNotEquals(dataWriter.stagingDir,userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     Path writtenFilePath = new Path(new Path(state.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString1);
 
     //testing user defined staging directory
     WorkUnitState state2 = TestUtils.createTestWorkUnitState();
+    state2.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,false);
     state2.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,true);
     state2.setProp(ConfigurationKeys.WRITER_STAGING_DIR, new Path(testTempPath, "staging").toString());
     state2.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, new Path(testTempPath, "output2").toString());
     state2.setProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR,userDefStagingDir);
     state2.setProp(ConfigurationKeys.WRITER_FILE_PATH, RandomStringUtils.randomAlphabetic(5));
     CopySource.serializeCopyEntity(state2, cf);
     CopySource.serializeCopyableDataset(state2, metadata);
+
     dataWriter = new FileAwareInputStreamDataWriter(state2, 1, 0);
+
     fileAwareInputStream = FileAwareInputStream.builder().file(cf)
         .inputStream(StreamUtils.convertStream(IOUtils.toInputStream(streamString2))).build();
+
     Assert.assertEquals(dataWriter.stagingDir.toUri().toString(),userDefStagingDir);
+    Assert.assertNotEquals(dataWriter.stagingDir,dataSetDefinedStagingDir);
+
     dataWriter.write(fileAwareInputStream);
     dataWriter.commit();
     writtenFilePath = new Path(new Path(state2.getProp(ConfigurationKeys.WRITER_OUTPUT_DIR),
         cf.getDatasetAndPartition(metadata).identifier()), cf.getDestination());
+
     Assert.assertEquals(IOUtils.toString(new FileInputStream(writtenFilePath.toString())), streamString2);
+
+    //testing dataset defined staging directory
+    CopyableDatasetMetadata metadata2 = new CopyableDatasetMetadata(new TestCopyableDataset(new Path(testRootDir + "/db1")));
+    WorkUnitState state3 = TestUtils.createTestWorkUnitState();
+    state3.setProp(DATASET_DEFINED_STAGING_DIR_FLAG,true);

Review comment:
       There is a missing space here. You can run reformat code command in IntelliJ to fix all the code style issues. If you want to fix only your code, you can first select it in the editor, and then run the command.

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -88,6 +89,7 @@
   public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
+  public static final String STAGING_DIR_SUFFIX = "/staging";

Review comment:
       There are a couple of things we need to keep in mind regarding this folder:
   
   1. Since we are going to put those in every dataset folder, we need to have a way to tell apart folders that are part of the dataset vs our temporary storage places.
   2.  If our folder name starts with the dot ".", it will be hidden by default in UI and CLI tools, so users wouldn't be distracted by those folders. 
   3. We'll need to have a way to find all such folders and delete them from time to time. If job gets interrupted and does not clean up after itself, some automation would need to go and delete old temporary files. Otherwise those abandoned temp files will consume all of the storage space.
   
   I think we can go with ".tmp" to ensure that this folder is hidden and is clearly marked as temporary.

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) {
       this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX);

Review comment:
       You can pass multiple arguments to Path to concatenate them. It's better to avoid adding path strings to each other, because you can end up with multiple of no slashes in the path. For example, depending on user input, you can end up with "/first/second", "/firstsecond" or "/first//second". 

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) {
       this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX);

Review comment:
       URN is a standard with a special format like "urn:linkedin:data". Looks like we'll have a path and not a URN here. I wonder if there are other methods that will return data set location, or whenever we can rename this method to something like getDatasetURI() or getDatasetURL(), depending on what we can get here. See also the discussion on the differences between URI/URL/URN - https://stackoverflow.com/a/28865728




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] autumnust commented on a change in pull request #3054: Making dataset name as part of staging directory in gobblin-distcp (f…

Posted by GitBox <gi...@apache.org>.
autumnust commented on a change in pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054#discussion_r448641128



##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) {
       this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) {
+      this.stagingDir = new Path(this.copyableDatasetMetadata.getDatasetURN() + STAGING_DIR_SUFFIX);
     } else {
       this.stagingDir = this.writerAttemptIdOptional.isPresent() ? WriterUtils.getWriterStagingDir(state, numBranches, branchId, this.writerAttemptIdOptional.get())
           : WriterUtils.getWriterStagingDir(state, numBranches, branchId);
     }
 
     this.outputDir = getOutputDir(state);
-    this.copyableDatasetMetadata =
-        CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
+

Review comment:
       same as above

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -139,17 +142,20 @@ public FileAwareInputStreamDataWriter(State state, int numBranches, int branchId
     URI uri = URI.create(uriStr);
     this.fs = FileSystem.get(uri, conf);
     this.fileContext = FileContext.getFileContext(uri, conf);
+    this.copyableDatasetMetadata =
+        CopyableDatasetMetadata.deserialize(state.getProp(CopySource.SERIALIZED_COPYABLE_DATASET));
 
     if (state.getPropAsBoolean(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false)) {
       this.stagingDir = new Path(state.getProp(ConfigurationKeys.USER_DEFINED_STATIC_STAGING_DIR));
+    } else if (state.getPropAsBoolean(ConfigurationKeys.DATASET_DEFINED_STAGING_DIR_FLAG,false)) {

Review comment:
       Hmm I think there's something missing here.
   
   Please take a look at `org.apache.gobblin.util.WriterUtils#getWriterStagingDir(org.apache.gobblin.configuration.State, int, int)` method, which is called below. The determination of a staging directory is not totally based on user-defined configuration, but some runtime behavior.
   
   
   Specifically within the `getWriterStagingDir`, the root path of staging dir comes from `org.apache.gobblin.configuration.ConfigurationKeys#WRITER_STAGING_DIR` which is a conf set dynamically in the gobblin job. In the method it also contains logic to differentiate different staging directory for different forks. The current change you made will lost that feature. 
   
   We can sync offline on this if that could help. 

##########
File path: gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
##########
@@ -104,6 +106,7 @@
   private final Options.Rename renameOptions;
   private final FileContext fileContext;
 
+

Review comment:
       Can you remove this empty line if being added accidentally ? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-gobblin] aryavaibhav93 closed pull request #3054: Making dataset name as part of staging directory in gobblin-distcp (f…

Posted by GitBox <gi...@apache.org>.
aryavaibhav93 closed pull request #3054:
URL: https://github.com/apache/incubator-gobblin/pull/3054


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org