You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/08 19:11:20 UTC

[gobblin] branch master updated: [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9aa7c  [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)
9d9aa7c is described below

commit 9d9aa7c93583b01cff082afb8664e8727d3dc61b
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Feb 8 11:11:14 2022 -0800

    [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)
    
    * change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value
    do not reuse the metric
    
    * fix unit tests
    
    * address review comments
---
 .../writer/FileAwareInputStreamDataWriter.java     |  8 ++++-
 .../data/management/copy/CopyableFileUtils.java    | 20 ++++++-----
 .../copy/converter/UnGzipConverterTest.java        |  2 +-
 .../writer/FileAwareInputStreamDataWriterTest.java | 41 ++++++++++++----------
 .../TarArchiveInputStreamDataWriterTest.java       | 28 +++++++--------
 5 files changed, 56 insertions(+), 43 deletions(-)

diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 96e6333..1b1fec3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -85,7 +85,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
 
   public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
   public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
-  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
+  public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = true;
   public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
   public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
 
@@ -161,6 +161,11 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
     this.recoveryHelper = new RecoveryHelper(this.fs, state);
     this.actualProcessedCopyableFile = Optional.absent();
 
+    // remove the old metric which counts how many bytes are copied, because in case of retries, this can give incorrect value
+    if (getMetricContext().getMetrics().containsKey(GOBBLIN_COPY_BYTES_COPIED_METER)) {
+      getMetricContext().remove(GOBBLIN_COPY_BYTES_COPIED_METER);
+    }
+
     this.copySpeedMeter = getMetricContext().meter(GOBBLIN_COPY_BYTES_COPIED_METER);
 
     this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, StreamCopier.DEFAULT_BUFFER_SIZE);
@@ -295,6 +300,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
         log.warn("Broker error. Some features of stream copier may not be available.", nce);
       } finally {
         os.close();
+        log.info("OutputStream for file {} is closed.", writeAt);
         inputStream.close();
       }
     }
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
index 4e85d5a..11418a2 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
@@ -43,37 +43,41 @@ public class CopyableFileUtils {
   }
 
   public static CopyableFile getTestCopyableFile() {
-    return getTestCopyableFile(null, null);
+    return getTestCopyableFile(0L, null);
   }
 
   public static CopyableFile getTestCopyableFile(String resourcePath) {
     return getTestCopyableFile(resourcePath, null);
   }
 
+  public static CopyableFile getTestCopyableFile(Long size, OwnerAndPermission ownerAndPermission) {
+    return getTestCopyableFile(null, null, size, ownerAndPermission);
+  }
+
   public static CopyableFile getTestCopyableFile(OwnerAndPermission ownerAndPermission) {
-    return getTestCopyableFile(null, null, ownerAndPermission);
+    return getTestCopyableFile(null, null, 0L, ownerAndPermission);
   }
 
   public static CopyableFile getTestCopyableFile(String resourcePath, OwnerAndPermission ownerAndPermission) {
-    return getTestCopyableFile(resourcePath, null, ownerAndPermission);
+    return getTestCopyableFile(resourcePath, null, 0L, ownerAndPermission);
   }
 
-  public static CopyableFile getTestCopyableFile(String resourcePath, String relativePath,
+  public static CopyableFile getTestCopyableFile(String resourcePath, String relativePath, Long size,
       OwnerAndPermission ownerAndPermission) {
-    return getTestCopyableFile(resourcePath, getRandomPath(), relativePath, ownerAndPermission);
+    return getTestCopyableFile(resourcePath, getRandomPath(), relativePath, size, ownerAndPermission);
   }
 
   public static CopyableFile getTestCopyableFile(String resourcePath, String destinationPath, String relativePath,
-      OwnerAndPermission ownerAndPermission) {
+      Long size, OwnerAndPermission ownerAndPermission) {
 
     FileStatus status = null;
 
     if (resourcePath == null) {
       resourcePath = getRandomPath();
-      status = new FileStatus(0l, false, 0, 0l, 0l, new Path(resourcePath));
+      status = new FileStatus(size, false, 0, 0l, 0l, new Path(resourcePath));
     } else {
       String filePath = CopyableFileUtils.class.getClassLoader().getResource(resourcePath).getFile();
-      status = new FileStatus(0l, false, 0, 0l, 0l, new Path(filePath));
+      status = new FileStatus(size, false, 0, 0l, 0l, new Path(filePath));
     }
 
     if (relativePath == null) {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
index 07db779..fae26a8 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
@@ -81,7 +81,7 @@ public class UnGzipConverterTest {
       String fullPath = getClass().getClassLoader().getResource(filePath).getFile();
 
       FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
-          .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, null))
+          .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, 0L, null))
           .inputStream(fs.open(new Path(fullPath))).build();
 
       Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, new WorkUnitState());
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index f9d7896..8bc2895 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -16,9 +16,6 @@
  */
 package org.apache.gobblin.data.management.copy.writer;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -28,9 +25,26 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.List;
 import java.util.Properties;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.crypto.EncryptionConfigParser;
@@ -50,17 +64,6 @@ import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
 import org.apache.gobblin.util.TestUtils;
 import org.apache.gobblin.util.WriterUtils;
 import org.apache.gobblin.util.io.StreamUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
 
@@ -85,7 +88,7 @@ public class FileAwareInputStreamDataWriterTest {
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString1.length(), ownerAndPermission);
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
     WorkUnitState state = TestUtils.createTestWorkUnitState();
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
@@ -174,7 +177,7 @@ public class FileAwareInputStreamDataWriterTest {
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
 
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
 
@@ -208,7 +211,7 @@ public class FileAwareInputStreamDataWriterTest {
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
 
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
 
@@ -254,7 +257,7 @@ public class FileAwareInputStreamDataWriterTest {
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission =
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
 
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
 
@@ -429,7 +432,7 @@ public class FileAwareInputStreamDataWriterTest {
     FileStatus status = fs.getFileStatus(testTempPath);
     OwnerAndPermission ownerAndPermission = new OwnerAndPermission(status.getOwner(), status.getGroup(),
         new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-    CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+    CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString1.length(), ownerAndPermission);
     CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
     WorkUnitState state = TestUtils.createTestWorkUnitState();
     state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG, false);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
index baa1c61..ec2c7a9 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
@@ -16,19 +16,6 @@
  */
 package org.apache.gobblin.data.management.copy.writer;
 
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.copy.CopySource;
-import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
-import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.CopyableFileUtils;
-import org.apache.gobblin.data.management.copy.FileAwareInputStream;
-import org.apache.gobblin.data.management.copy.OwnerAndPermission;
-import org.apache.gobblin.data.management.copy.TestCopyableDataset;
-import org.apache.gobblin.data.management.copy.converter.UnGzipConverter;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.TestUtils;
-
 import java.io.FileInputStream;
 import java.io.IOException;
 
@@ -49,6 +36,19 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyableFileUtils;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+import org.apache.gobblin.data.management.copy.TestCopyableDataset;
+import org.apache.gobblin.data.management.copy.converter.UnGzipConverter;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.TestUtils;
+
 
 public class TarArchiveInputStreamDataWriterTest {
 
@@ -118,7 +118,7 @@ public class TarArchiveInputStreamDataWriterTest {
         new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL,
             FsAction.ALL));
     CopyableFile cf =
-        CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, newFileName).toString(), newFileName,
+        CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, newFileName).toString(), newFileName, 0L,
             ownerAndPermission);
 
     FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf)