You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/02/17 22:07:55 UTC
[hadoop] branch trunk updated: HADOOP-15961. S3A committers: make
sure there's regular progress() calls.
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new c77fc69 HADOOP-15961. S3A committers: make sure there's regular progress() calls.
c77fc69 is described below
commit c77fc6971b5194c9dae184703caa87da271a85eb
Author: lqjacklee <lq...@126.com>
AuthorDate: Mon Feb 17 22:06:34 2020 +0000
HADOOP-15961. S3A committers: make sure there's regular progress() calls.
Contributed by lqjacklee.
Change-Id: I13ca153e1e32b21dbe64d6fb25e260e0ff66154d
---
.../hadoop/fs/s3a/commit/CommitOperations.java | 6 ++-
.../fs/s3a/commit/staging/StagingCommitter.java | 3 +-
.../apache/hadoop/fs/s3a/auth/ITestAssumeRole.java | 9 ++++-
.../apache/hadoop/fs/s3a/auth/ProgressCounter.java | 43 ++++++++++++++++++++++
.../fs/s3a/commit/ITestCommitOperations.java | 31 +++++++++++-----
5 files changed, 79 insertions(+), 13 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
index 7d7be95..8592ad4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
@@ -437,13 +438,15 @@ public class CommitOperations {
* @param destPath destination path
* @param partition partition/subdir. Not used
* @param uploadPartSize size of upload
+ * @param progress progress callback
* @return a pending upload entry
* @throws IOException failure
*/
public SinglePendingCommit uploadFileToPendingCommit(File localFile,
Path destPath,
String partition,
- long uploadPartSize)
+ long uploadPartSize,
+ Progressable progress)
throws IOException {
LOG.debug("Initiating multipart upload from {} to {}",
@@ -502,6 +505,7 @@ public class CommitOperations {
commitData.bindCommitData(parts);
statistics.commitUploaded(length);
+ progress.progress();
threw = false;
return commitData;
} finally {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 6cc9e48..7eca1b4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -712,7 +712,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
localFile,
destPath,
partition,
- uploadPartSize);
+ uploadPartSize,
+ context);
LOG.debug("{}: adding pending commit {}", getRole(), commit);
commits.add(commit);
});
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
index 82589fa..cf935d2 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java
@@ -550,6 +550,8 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
final int uploadPartSize = 5 * 1024 * 1024;
+ ProgressCounter progress = new ProgressCounter();
+ progress.assertCount("Progress counter should be zero", 0);
Path basePath = methodPath();
Path readOnlyDir = new Path(basePath, "readOnlyDir");
Path writeableDir = new Path(basePath, "writeableDir");
@@ -577,8 +579,9 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
forbidden("initiate MultiPartUpload",
() -> {
return operations.uploadFileToPendingCommit(localSrc,
- uploadDest, "", uploadPartSize);
+ uploadDest, "", uploadPartSize, progress);
});
+ progress.assertCount("progress counter not expected.", 0);
// delete the file
localSrc.delete();
// create a directory there
@@ -596,11 +599,13 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
writeCSVData(src);
SinglePendingCommit pending =
fullOperations.uploadFileToPendingCommit(src, dest, "",
- uploadPartSize);
+ uploadPartSize, progress);
pending.save(fs, new Path(readOnlyDir,
name + CommitConstants.PENDING_SUFFIX), true);
assertTrue(src.delete());
}));
+ progress.assertCount("Process counter is not expected",
+ range);
try {
// we expect to be able to list all the files here
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java
new file mode 100644
index 0000000..15a5715
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ProgressCounter.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.auth;
+
+import org.apache.hadoop.util.Progressable;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A progress callback for testing.
+ */
+public class ProgressCounter implements Progressable {
+
+ private long count;
+
+ public void progress() {
+ count++;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public void assertCount(String message, int expected) {
+ assertEquals(message, expected, getCount());
+ }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
index 74fe45d..d199337 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -26,7 +26,6 @@ import java.util.List;
import com.amazonaws.services.s3.model.PartETag;
import com.google.common.collect.Lists;
-import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
@@ -69,6 +69,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
private static final byte[] DATASET = dataset(1000, 'a', 32);
private static final String S3A_FACTORY_KEY = String.format(
COMMITTER_FACTORY_SCHEME_PATTERN, "s3a");
+ private ProgressCounter progress;
/**
* A compile time flag which allows you to disable failure reset before
@@ -105,6 +106,8 @@ public class ITestCommitOperations extends AbstractCommitITest {
verifyIsMagicCommitFS(getFileSystem());
// abort,; rethrow on failure
setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT);
+ progress = new ProgressCounter();
+ progress.assertCount("progress", 0);
}
@Test
@@ -366,7 +369,7 @@ public class ITestCommitOperations extends AbstractCommitITest {
private void validateIntermediateAndFinalPaths(Path magicFilePath,
Path destFile)
throws IOException {
- assertPathDoesNotExist("dest file was found", destFile);
+ assertPathDoesNotExist("dest file was created", destFile);
}
/**
@@ -452,8 +455,10 @@ public class ITestCommitOperations extends AbstractCommitITest {
SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile,
- dest, null,
- DEFAULT_MULTIPART_SIZE);
+ dest,
+ null,
+ DEFAULT_MULTIPART_SIZE,
+ progress);
resetFailures();
assertPathDoesNotExist("pending commit", dest);
fullThrottle();
@@ -461,6 +466,8 @@ public class ITestCommitOperations extends AbstractCommitITest {
resetFailures();
FileStatus status = verifyPathExists(fs,
"uploaded file commit", dest);
+ progress.assertCount("Progress counter should be 1.",
+ 1);
assertEquals("File length in " + status, 0, status.getLen());
}
@@ -477,10 +484,11 @@ public class ITestCommitOperations extends AbstractCommitITest {
assertPathDoesNotExist("test setup", dest);
SinglePendingCommit pendingCommit =
actions.uploadFileToPendingCommit(tempFile,
- dest, null,
- DEFAULT_MULTIPART_SIZE);
+ dest,
+ null,
+ DEFAULT_MULTIPART_SIZE,
+ progress);
resetFailures();
- LOG.debug("Precommit validation");
assertPathDoesNotExist("pending commit", dest);
fullThrottle();
LOG.debug("Postcommit validation");
@@ -488,6 +496,8 @@ public class ITestCommitOperations extends AbstractCommitITest {
resetFailures();
String s = readUTF8(fs, dest, -1);
assertEquals(text, s);
+ progress.assertCount("Progress counter should be 1.",
+ 1);
}
@Test(expected = FileNotFoundException.class)
@@ -498,7 +508,9 @@ public class ITestCommitOperations extends AbstractCommitITest {
Path dest = methodPath("testUploadMissingile");
fullThrottle();
actions.uploadFileToPendingCommit(tempFile, dest, null,
- DEFAULT_MULTIPART_SIZE);
+ DEFAULT_MULTIPART_SIZE, progress);
+ progress.assertCount("Progress counter should be 1.",
+ 1);
}
@Test
@@ -598,7 +610,8 @@ public class ITestCommitOperations extends AbstractCommitITest {
SinglePendingCommit commit1 =
actions.uploadFileToPendingCommit(localFile,
destination, null,
- DEFAULT_MULTIPART_SIZE);
+ DEFAULT_MULTIPART_SIZE,
+ progress);
commits.add(commit1);
}
resetFailures();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org