You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/02/08 19:11:20 UTC
[gobblin] branch master updated: [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9aa7c [GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)
9d9aa7c is described below
commit 9d9aa7c93583b01cff082afb8664e8727d3dc61b
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Tue Feb 8 11:11:14 2022 -0800
[GOBBLIN-1606] change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value (#3464)
* change DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE value
do not reuse the metric
* fix unit tests
* address review comments
---
.../writer/FileAwareInputStreamDataWriter.java | 8 ++++-
.../data/management/copy/CopyableFileUtils.java | 20 ++++++-----
.../copy/converter/UnGzipConverterTest.java | 2 +-
.../writer/FileAwareInputStreamDataWriterTest.java | 41 ++++++++++++----------
.../TarArchiveInputStreamDataWriterTest.java | 28 +++++++--------
5 files changed, 56 insertions(+), 43 deletions(-)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
index 96e6333..1b1fec3 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriter.java
@@ -85,7 +85,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
public static final String GOBBLIN_COPY_BYTES_COPIED_METER = "gobblin.copy.bytesCopiedMeter";
public static final String GOBBLIN_COPY_CHECK_FILESIZE = "gobblin.copy.checkFileSize";
- public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = false;
+ public static final boolean DEFAULT_GOBBLIN_COPY_CHECK_FILESIZE = true;
public static final String GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = "gobblin.copy.task.overwrite.on.commit";
public static final boolean DEFAULT_GOBBLIN_COPY_TASK_OVERWRITE_ON_COMMIT = false;
@@ -161,6 +161,11 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
this.recoveryHelper = new RecoveryHelper(this.fs, state);
this.actualProcessedCopyableFile = Optional.absent();
+ // remove the old metric which counts how many bytes are copied, because in case of retries, this can give incorrect value
+ if (getMetricContext().getMetrics().containsKey(GOBBLIN_COPY_BYTES_COPIED_METER)) {
+ getMetricContext().remove(GOBBLIN_COPY_BYTES_COPIED_METER);
+ }
+
this.copySpeedMeter = getMetricContext().meter(GOBBLIN_COPY_BYTES_COPIED_METER);
this.bufferSize = state.getPropAsInt(CopyConfiguration.BUFFER_SIZE, StreamCopier.DEFAULT_BUFFER_SIZE);
@@ -295,6 +300,7 @@ public class FileAwareInputStreamDataWriter extends InstrumentedDataWriter<FileA
log.warn("Broker error. Some features of stream copier may not be available.", nce);
} finally {
os.close();
+ log.info("OutputStream for file {} is closed.", writeAt);
inputStream.close();
}
}
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
index 4e85d5a..11418a2 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/CopyableFileUtils.java
@@ -43,37 +43,41 @@ public class CopyableFileUtils {
}
public static CopyableFile getTestCopyableFile() {
- return getTestCopyableFile(null, null);
+ return getTestCopyableFile(0L, null);
}
public static CopyableFile getTestCopyableFile(String resourcePath) {
return getTestCopyableFile(resourcePath, null);
}
+ public static CopyableFile getTestCopyableFile(Long size, OwnerAndPermission ownerAndPermission) {
+ return getTestCopyableFile(null, null, size, ownerAndPermission);
+ }
+
public static CopyableFile getTestCopyableFile(OwnerAndPermission ownerAndPermission) {
- return getTestCopyableFile(null, null, ownerAndPermission);
+ return getTestCopyableFile(null, null, 0L, ownerAndPermission);
}
public static CopyableFile getTestCopyableFile(String resourcePath, OwnerAndPermission ownerAndPermission) {
- return getTestCopyableFile(resourcePath, null, ownerAndPermission);
+ return getTestCopyableFile(resourcePath, null, 0L, ownerAndPermission);
}
- public static CopyableFile getTestCopyableFile(String resourcePath, String relativePath,
+ public static CopyableFile getTestCopyableFile(String resourcePath, String relativePath, Long size,
OwnerAndPermission ownerAndPermission) {
- return getTestCopyableFile(resourcePath, getRandomPath(), relativePath, ownerAndPermission);
+ return getTestCopyableFile(resourcePath, getRandomPath(), relativePath, size, ownerAndPermission);
}
public static CopyableFile getTestCopyableFile(String resourcePath, String destinationPath, String relativePath,
- OwnerAndPermission ownerAndPermission) {
+ Long size, OwnerAndPermission ownerAndPermission) {
FileStatus status = null;
if (resourcePath == null) {
resourcePath = getRandomPath();
- status = new FileStatus(0l, false, 0, 0l, 0l, new Path(resourcePath));
+ status = new FileStatus(size, false, 0, 0l, 0l, new Path(resourcePath));
} else {
String filePath = CopyableFileUtils.class.getClassLoader().getResource(resourcePath).getFile();
- status = new FileStatus(0l, false, 0, 0l, 0l, new Path(filePath));
+ status = new FileStatus(size, false, 0, 0l, 0l, new Path(filePath));
}
if (relativePath == null) {
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
index 07db779..fae26a8 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/converter/UnGzipConverterTest.java
@@ -81,7 +81,7 @@ public class UnGzipConverterTest {
String fullPath = getClass().getClassLoader().getResource(filePath).getFile();
FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder()
- .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, null))
+ .file(CopyableFileUtils.getTestCopyableFile(filePath, "/tmp/" + fileName, null, 0L, null))
.inputStream(fs.open(new Path(fullPath))).build();
Iterable<FileAwareInputStream> iterable = converter.convertRecord("outputSchema", fileAwareInputStream, new WorkUnitState());
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
index f9d7896..8bc2895 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/FileAwareInputStreamDataWriterTest.java
@@ -16,9 +16,6 @@
*/
package org.apache.gobblin.data.management.copy.writer;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
@@ -28,9 +25,26 @@ import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.List;
import java.util.Properties;
+
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.crypto.EncryptionConfigParser;
@@ -50,17 +64,6 @@ import org.apache.gobblin.data.management.copy.splitter.DistcpFileSplitter;
import org.apache.gobblin.util.TestUtils;
import org.apache.gobblin.util.WriterUtils;
import org.apache.gobblin.util.io.StreamUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
@@ -85,7 +88,7 @@ public class FileAwareInputStreamDataWriterTest {
FileStatus status = fs.getFileStatus(testTempPath);
OwnerAndPermission ownerAndPermission =
new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString1.length(), ownerAndPermission);
CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
WorkUnitState state = TestUtils.createTestWorkUnitState();
state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG,false);
@@ -174,7 +177,7 @@ public class FileAwareInputStreamDataWriterTest {
FileStatus status = fs.getFileStatus(testTempPath);
OwnerAndPermission ownerAndPermission =
new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
@@ -208,7 +211,7 @@ public class FileAwareInputStreamDataWriterTest {
FileStatus status = fs.getFileStatus(testTempPath);
OwnerAndPermission ownerAndPermission =
new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
@@ -254,7 +257,7 @@ public class FileAwareInputStreamDataWriterTest {
FileStatus status = fs.getFileStatus(testTempPath);
OwnerAndPermission ownerAndPermission =
new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString.length, ownerAndPermission);
CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
@@ -429,7 +432,7 @@ public class FileAwareInputStreamDataWriterTest {
FileStatus status = fs.getFileStatus(testTempPath);
OwnerAndPermission ownerAndPermission = new OwnerAndPermission(status.getOwner(), status.getGroup(),
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
- CopyableFile cf = CopyableFileUtils.getTestCopyableFile(ownerAndPermission);
+ CopyableFile cf = CopyableFileUtils.getTestCopyableFile((long) streamString1.length(), ownerAndPermission);
CopyableDatasetMetadata metadata = new CopyableDatasetMetadata(new TestCopyableDataset(new Path("/source")));
WorkUnitState state = TestUtils.createTestWorkUnitState();
state.setProp(ConfigurationKeys.USER_DEFINED_STAGING_DIR_FLAG, false);
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
index baa1c61..ec2c7a9 100644
--- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/writer/TarArchiveInputStreamDataWriterTest.java
@@ -16,19 +16,6 @@
*/
package org.apache.gobblin.data.management.copy.writer;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.configuration.WorkUnitState;
-import org.apache.gobblin.data.management.copy.CopySource;
-import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
-import org.apache.gobblin.data.management.copy.CopyableFile;
-import org.apache.gobblin.data.management.copy.CopyableFileUtils;
-import org.apache.gobblin.data.management.copy.FileAwareInputStream;
-import org.apache.gobblin.data.management.copy.OwnerAndPermission;
-import org.apache.gobblin.data.management.copy.TestCopyableDataset;
-import org.apache.gobblin.data.management.copy.converter.UnGzipConverter;
-import org.apache.gobblin.util.PathUtils;
-import org.apache.gobblin.util.TestUtils;
-
import java.io.FileInputStream;
import java.io.IOException;
@@ -49,6 +36,19 @@ import org.testng.annotations.Test;
import com.google.common.collect.Iterables;
import com.google.common.io.Files;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.copy.CopySource;
+import org.apache.gobblin.data.management.copy.CopyableDatasetMetadata;
+import org.apache.gobblin.data.management.copy.CopyableFile;
+import org.apache.gobblin.data.management.copy.CopyableFileUtils;
+import org.apache.gobblin.data.management.copy.FileAwareInputStream;
+import org.apache.gobblin.data.management.copy.OwnerAndPermission;
+import org.apache.gobblin.data.management.copy.TestCopyableDataset;
+import org.apache.gobblin.data.management.copy.converter.UnGzipConverter;
+import org.apache.gobblin.util.PathUtils;
+import org.apache.gobblin.util.TestUtils;
+
public class TarArchiveInputStreamDataWriterTest {
@@ -118,7 +118,7 @@ public class TarArchiveInputStreamDataWriterTest {
new OwnerAndPermission(status.getOwner(), status.getGroup(), new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL));
CopyableFile cf =
- CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, newFileName).toString(), newFileName,
+ CopyableFileUtils.getTestCopyableFile(filePath, new Path(testTempPath, newFileName).toString(), newFileName, 0L,
ownerAndPermission);
FileAwareInputStream fileAwareInputStream = FileAwareInputStream.builder().file(cf)