You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2016/10/18 20:16:55 UTC
[09/12] hadoop git commit: HADOOP-13560. S3ABlockOutputStream to
support huge (many GB) file writes. Contributed by Steve Loughran
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
new file mode 100644
index 0000000..a60d084
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.util.Progressable;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+/**
+ * Scale test which creates a huge file.
+ *
+ * <b>Important:</b> the order in which these tests execute is fixed to
+ * alphabetical order. Test cases are numbered {@code test_123_} to impose
+ * an ordering based on the numbers.
+ *
+ * Having this ordering allows the tests to assume that the huge file
+ * exists. Even so: they should all have a {@link #assumeHugeFileExists()}
+ * check at the start, in case an individual test is executed.
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractSTestS3AHugeFiles.class);
+ public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
+ public static final String DEFAULT_PARTITION_SIZE = "8M";
+ private Path scaleTestDir;
+ private Path hugefile;
+ private Path hugefileRenamed;
+
+ private int uploadBlockSize = DEFAULT_UPLOAD_BLOCKSIZE;
+ private int partitionSize;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ final Path testPath = getTestPath();
+ scaleTestDir = new Path(testPath, "scale");
+ hugefile = new Path(scaleTestDir, "hugefile");
+ hugefileRenamed = new Path(scaleTestDir, "hugefileRenamed");
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ // do nothing. Specifically: do not delete the test dir
+ }
+
+ /**
+ * Note that this can get called before test setup.
+ * @return the configuration to use.
+ */
+ @Override
+ protected Configuration createConfiguration() {
+ Configuration conf = super.createConfiguration();
+ partitionSize = (int)getTestPropertyBytes(conf,
+ KEY_HUGE_PARTITION_SIZE,
+ DEFAULT_PARTITION_SIZE);
+ assertTrue("Partition size too small: " + partitionSize,
+ partitionSize > MULTIPART_MIN_SIZE);
+ conf.setLong(SOCKET_SEND_BUFFER, _1MB);
+ conf.setLong(SOCKET_RECV_BUFFER, _1MB);
+ conf.setLong(MIN_MULTIPART_THRESHOLD, partitionSize);
+ conf.setInt(MULTIPART_SIZE, partitionSize);
+ conf.set(USER_AGENT_PREFIX, "STestS3AHugeFileCreate");
+ conf.setBoolean(FAST_UPLOAD, true);
+ conf.set(FAST_UPLOAD_BUFFER, getBlockOutputBufferName());
+ return conf;
+ }
+
+ /**
+ * The name of the buffering mechanism to use.
+ * @return a buffering mechanism
+ */
+ protected abstract String getBlockOutputBufferName();
+
+ @Test
+ public void test_010_CreateHugeFile() throws IOException {
+ assertFalse("Please run this test sequentially to avoid timeouts" +
+ " and bandwidth problems", isParallelExecution());
+ long filesize = getTestPropertyBytes(getConf(), KEY_HUGE_FILESIZE,
+ DEFAULT_HUGE_FILESIZE);
+ long filesizeMB = filesize / _1MB;
+
+ // clean up from any previous attempts
+ deleteHugeFile();
+
+ describe("Creating file %s of size %d MB" +
+ " with partition size %d buffered by %s",
+ hugefile, filesizeMB, partitionSize, getBlockOutputBufferName());
+
+ // now do a check of available upload time, with a pessimistic bandwidth
+ // (that of remote upload tests). If the test times out then not only is
+ // the test outcome lost, as the follow-on tests continue, they will
+ // overlap with the ongoing upload test, for much confusion.
+ int timeout = getTestTimeoutSeconds();
+ // assume 1 MB/s upload bandwidth
+ int bandwidth = _1MB;
+ long uploadTime = filesize / bandwidth;
+ assertTrue(String.format("Timeout set in %s seconds is too low;" +
+ " estimating upload time of %d seconds at 1 MB/s." +
+ " Rerun tests with -D%s=%d",
+ timeout, uploadTime, KEY_TEST_TIMEOUT, uploadTime * 2),
+ uploadTime < timeout);
+ assertEquals("File size set in " + KEY_HUGE_FILESIZE + " = " + filesize
+ + " is not a multiple of " + uploadBlockSize,
+ 0, filesize % uploadBlockSize);
+
+ byte[] data = new byte[uploadBlockSize];
+ for (int i = 0; i < uploadBlockSize; i++) {
+ data[i] = (byte) (i % 256);
+ }
+
+ long blocks = filesize / uploadBlockSize;
+ long blocksPerMB = _1MB / uploadBlockSize;
+
+ // perform the upload.
+ // there's lots of logging here, so that a tail -f on the output log
+ // can give a view of what is happening.
+ StorageStatistics storageStatistics = fs.getStorageStatistics();
+ String putRequests = Statistic.OBJECT_PUT_REQUESTS.getSymbol();
+ String putBytes = Statistic.OBJECT_PUT_BYTES.getSymbol();
+ Statistic putRequestsActive = Statistic.OBJECT_PUT_REQUESTS_ACTIVE;
+ Statistic putBytesPending = Statistic.OBJECT_PUT_BYTES_PENDING;
+
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+
+ long blocksPer10MB = blocksPerMB * 10;
+ ProgressCallback progress = new ProgressCallback(timer);
+ try (FSDataOutputStream out = fs.create(hugefile,
+ true,
+ uploadBlockSize,
+ progress)) {
+
+ for (long block = 1; block <= blocks; block++) {
+ out.write(data);
+ long written = block * uploadBlockSize;
+ // every 10 MB and on file upload @ 100%, print some stats
+ if (block % blocksPer10MB == 0 || written == filesize) {
+ long percentage = written * 100 / filesize;
+ double elapsedTime = timer.elapsedTime() / 1.0e9;
+ double writtenMB = 1.0 * written / _1MB;
+ LOG.info(String.format("[%02d%%] Buffered %.2f MB out of %d MB;" +
+ " PUT %d bytes (%d pending) in %d operations (%d active);" +
+ " elapsedTime=%.2fs; write to buffer bandwidth=%.2f MB/s",
+ percentage,
+ writtenMB,
+ filesizeMB,
+ storageStatistics.getLong(putBytes),
+ gaugeValue(putBytesPending),
+ storageStatistics.getLong(putRequests),
+ gaugeValue(putRequestsActive),
+ elapsedTime,
+ writtenMB / elapsedTime));
+ }
+ }
+ // now close the file
+ LOG.info("Closing file and completing write operation");
+ ContractTestUtils.NanoTimer closeTimer
+ = new ContractTestUtils.NanoTimer();
+ out.close();
+ closeTimer.end("time to close() output stream");
+ }
+
+ timer.end("time to write %d MB in blocks of %d",
+ filesizeMB, uploadBlockSize);
+ logFSState();
+ bandwidth(timer, filesize);
+ long putRequestCount = storageStatistics.getLong(putRequests);
+ Long putByteCount = storageStatistics.getLong(putBytes);
+ LOG.info("PUT {} bytes in {} operations; {} MB/operation",
+ putByteCount, putRequestCount,
+ putByteCount / (putRequestCount * _1MB));
+ LOG.info("Time per PUT {} nS",
+ toHuman(timer.nanosPerOperation(putRequestCount)));
+ assertEquals("active put requests in \n" + fs,
+ 0, gaugeValue(putRequestsActive));
+ ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
+ S3AFileStatus status = fs.getFileStatus(hugefile);
+ ContractTestUtils.assertIsFile(hugefile, status);
+ assertEquals("File size in " + status, filesize, status.getLen());
+ progress.verifyNoFailures("Put file " + hugefile + " of size " + filesize);
+ }
+
+ /**
+ * Progress callback from AWS. Likely to come in on a different thread.
+ */
+ private final class ProgressCallback implements Progressable,
+ ProgressListener {
+ private AtomicLong bytesTransferred = new AtomicLong(0);
+ private AtomicInteger failures = new AtomicInteger(0);
+ private final ContractTestUtils.NanoTimer timer;
+
+ private ProgressCallback(NanoTimer timer) {
+ this.timer = timer;
+ }
+
+ @Override
+ public void progress() {
+ }
+
+ @Override
+ public void progressChanged(ProgressEvent progressEvent) {
+ ProgressEventType eventType = progressEvent.getEventType();
+ if (eventType.isByteCountEvent()) {
+ bytesTransferred.addAndGet(progressEvent.getBytesTransferred());
+ }
+ switch (eventType) {
+ case TRANSFER_PART_FAILED_EVENT:
+ // failure
+ failures.incrementAndGet();
+ LOG.warn("Transfer failure");
+ break;
+ case TRANSFER_PART_COMPLETED_EVENT:
+ // completion
+ long elapsedTime = timer.elapsedTime();
+ double elapsedTimeS = elapsedTime / 1.0e9;
+ long written = bytesTransferred.get();
+ long writtenMB = written / _1MB;
+ LOG.info(String.format(
+ "Event %s; total uploaded=%d MB in %.1fs;" +
+ " effective upload bandwidth = %.2f MB/s",
+ progressEvent,
+ writtenMB, elapsedTimeS, writtenMB / elapsedTimeS));
+ break;
+ default:
+ if (eventType.isByteCountEvent()) {
+ LOG.debug("Event {}", progressEvent);
+ } else {
+ LOG.info("Event {}", progressEvent);
+ }
+ break;
+ }
+ }
+
+ @Override
+ public String toString() {
+ String sb = "ProgressCallback{"
+ + "bytesTransferred=" + bytesTransferred +
+ ", failures=" + failures +
+ '}';
+ return sb;
+ }
+
+ private void verifyNoFailures(String operation) {
+ assertEquals("Failures in " + operation +": " + this, 0, failures.get());
+ }
+ }
+
+ void assumeHugeFileExists() throws IOException {
+ ContractTestUtils.assertPathExists(fs, "huge file not created", hugefile);
+ ContractTestUtils.assertIsFile(fs, hugefile);
+ }
+
+ private void logFSState() {
+ LOG.info("File System state after operation:\n{}", fs);
+ }
+
+ @Test
+ public void test_040_PositionedReadHugeFile() throws Throwable {
+ assumeHugeFileExists();
+ final String encryption = getConf().getTrimmed(
+ SERVER_SIDE_ENCRYPTION_ALGORITHM);
+ boolean encrypted = encryption != null;
+ if (encrypted) {
+ LOG.info("File is encrypted with algorithm {}", encryption);
+ }
+ String filetype = encrypted ? "encrypted file" : "file";
+ describe("Positioned reads of %s %s", filetype, hugefile);
+ S3AFileStatus status = fs.getFileStatus(hugefile);
+ long filesize = status.getLen();
+ int ops = 0;
+ final int bufferSize = 8192;
+ byte[] buffer = new byte[bufferSize];
+ long eof = filesize - 1;
+
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ ContractTestUtils.NanoTimer readAtByte0, readAtByte0Again, readAtEOF;
+ try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+ readAtByte0 = new ContractTestUtils.NanoTimer();
+ in.readFully(0, buffer);
+ readAtByte0.end("time to read data at start of file");
+ ops++;
+
+ readAtEOF = new ContractTestUtils.NanoTimer();
+ in.readFully(eof - bufferSize, buffer);
+ readAtEOF.end("time to read data at end of file");
+ ops++;
+
+ readAtByte0Again = new ContractTestUtils.NanoTimer();
+ in.readFully(0, buffer);
+ readAtByte0Again.end("time to read data at start of file again");
+ ops++;
+ LOG.info("Final stream state: {}", in);
+ }
+ long mb = Math.max(filesize / _1MB, 1);
+
+ logFSState();
+ timer.end("time to performed positioned reads of %s of %d MB ",
+ filetype, mb);
+ LOG.info("Time per positioned read = {} nS",
+ toHuman(timer.nanosPerOperation(ops)));
+ }
+
+ @Test
+ public void test_050_readHugeFile() throws Throwable {
+ assumeHugeFileExists();
+ describe("Reading %s", hugefile);
+ S3AFileStatus status = fs.getFileStatus(hugefile);
+ long filesize = status.getLen();
+ long blocks = filesize / uploadBlockSize;
+ byte[] data = new byte[uploadBlockSize];
+
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ try (FSDataInputStream in = fs.open(hugefile, uploadBlockSize)) {
+ for (long block = 0; block < blocks; block++) {
+ in.readFully(data);
+ }
+ LOG.info("Final stream state: {}", in);
+ }
+
+ long mb = Math.max(filesize / _1MB, 1);
+ timer.end("time to read file of %d MB ", mb);
+ LOG.info("Time per MB to read = {} nS",
+ toHuman(timer.nanosPerOperation(mb)));
+ bandwidth(timer, filesize);
+ logFSState();
+ }
+
+ @Test
+ public void test_100_renameHugeFile() throws Throwable {
+ assumeHugeFileExists();
+ describe("renaming %s to %s", hugefile, hugefileRenamed);
+ S3AFileStatus status = fs.getFileStatus(hugefile);
+ long filesize = status.getLen();
+ fs.delete(hugefileRenamed, false);
+ ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+ fs.rename(hugefile, hugefileRenamed);
+ long mb = Math.max(filesize / _1MB, 1);
+ timer.end("time to rename file of %d MB", mb);
+ LOG.info("Time per MB to rename = {} nS",
+ toHuman(timer.nanosPerOperation(mb)));
+ bandwidth(timer, filesize);
+ logFSState();
+ S3AFileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
+ assertEquals(filesize, destFileStatus.getLen());
+
+ // rename back
+ ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
+ fs.rename(hugefileRenamed, hugefile);
+ timer2.end("Renaming back");
+ LOG.info("Time per MB to rename = {} nS",
+ toHuman(timer2.nanosPerOperation(mb)));
+ bandwidth(timer2, filesize);
+ }
+
+ @Test
+ public void test_999_DeleteHugeFiles() throws IOException {
+ deleteHugeFile();
+ ContractTestUtils.NanoTimer timer2 = new ContractTestUtils.NanoTimer();
+
+ fs.delete(hugefileRenamed, false);
+ timer2.end("time to delete %s", hugefileRenamed);
+ ContractTestUtils.rm(fs, getTestPath(), true, true);
+ }
+
+ protected void deleteHugeFile() throws IOException {
+ describe("Deleting %s", hugefile);
+ NanoTimer timer = new NanoTimer();
+ fs.delete(hugefile, false);
+ timer.end("time to delete %s", hugefile);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
index 208c491..4e1a734 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java
@@ -116,20 +116,9 @@ public class ITestS3ADeleteManyFiles extends S3AScaleTestBase {
@Test
public void testOpenCreate() throws IOException {
- Path dir = new Path("/tests3a");
- ContractTestUtils.createAndVerifyFile(fs, dir, 1024);
- ContractTestUtils.createAndVerifyFile(fs, dir, 5 * 1024 * 1024);
- ContractTestUtils.createAndVerifyFile(fs, dir, 20 * 1024 * 1024);
-
-
- /*
- Enable to test the multipart upload
- try {
- ContractTestUtils.createAndVerifyFile(fs, dir,
- (long)6 * 1024 * 1024 * 1024);
- } catch (IOException e) {
- fail(e.getMessage());
- }
- */
+ final Path scaleTestDir = getTestPath();
+ final Path srcDir = new Path(scaleTestDir, "opencreate");
+ ContractTestUtils.createAndVerifyFile(fs, srcDir, 1024);
+ ContractTestUtils.createAndVerifyFile(fs, srcDir, 50 * 1024);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
new file mode 100644
index 0000000..d6f15c8
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesArrayBlocks.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BUFFER_ARRAY} for buffering.
+ */
+public class ITestS3AHugeFilesArrayBlocks extends AbstractSTestS3AHugeFiles {
+
+ protected String getBlockOutputBufferName() {
+ return Constants.FAST_UPLOAD_BUFFER_ARRAY;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
new file mode 100644
index 0000000..b1323c4
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesByteBufferBlocks.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.s3a.Constants.FAST_UPLOAD_BYTEBUFFER;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BYTEBUFFER} for buffering.
+ */
+public class ITestS3AHugeFilesByteBufferBlocks
+ extends AbstractSTestS3AHugeFiles {
+
+ protected String getBlockOutputBufferName() {
+ return FAST_UPLOAD_BYTEBUFFER;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
new file mode 100644
index 0000000..45eef24
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesClassicOutput.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use classic output for writing things; tweaks the configuration to do
+ * this after it has been set up in the superclass.
+ * The generator test has been copied and re
+ */
+public class ITestS3AHugeFilesClassicOutput extends AbstractSTestS3AHugeFiles {
+
+ @Override
+ protected Configuration createConfiguration() {
+ final Configuration conf = super.createConfiguration();
+ conf.setBoolean(Constants.FAST_UPLOAD, false);
+ return conf;
+ }
+
+ protected String getBlockOutputBufferName() {
+ return "classic";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
new file mode 100644
index 0000000..2be5769
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesDiskBlocks.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.s3a.Constants;
+
+/**
+ * Use {@link Constants#FAST_UPLOAD_BUFFER_DISK} for buffering.
+ */
+public class ITestS3AHugeFilesDiskBlocks extends AbstractSTestS3AHugeFiles {
+
+ protected String getBlockOutputBufferName() {
+ return Constants.FAST_UPLOAD_BUFFER_DISK;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6c348c56/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index d861a16..af6d468 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -20,18 +20,18 @@ package org.apache.hadoop.fs.s3a.scale;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.Path;
-
import org.apache.hadoop.fs.s3a.S3AInputStream;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3ATestConstants;
-import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
@@ -40,6 +40,8 @@ import org.slf4j.LoggerFactory;
import java.io.InputStream;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
/**
* Base class for scale tests; here is where the common scale configuration
* keys are defined.
@@ -47,71 +49,18 @@ import java.io.InputStream;
public class S3AScaleTestBase extends Assert implements S3ATestConstants {
@Rule
- public TestName methodName = new TestName();
+ public final TestName methodName = new TestName();
@Rule
- public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+ public Timeout testTimeout = createTestTimeout();
- @BeforeClass
- public static void nameThread() {
+ @Before
+ public void nameThread() {
Thread.currentThread().setName("JUnit");
}
- /**
- * The number of operations to perform: {@value}.
- */
- public static final String KEY_OPERATION_COUNT =
- SCALE_TEST + "operation.count";
-
- /**
- * The number of directory operations to perform: {@value}.
- */
- public static final String KEY_DIRECTORY_COUNT =
- SCALE_TEST + "directory.count";
-
- /**
- * The readahead buffer: {@value}.
- */
- public static final String KEY_READ_BUFFER_SIZE =
- S3A_SCALE_TEST + "read.buffer.size";
-
- public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
-
- /**
- * Key for a multi MB test file: {@value}.
- */
- public static final String KEY_CSVTEST_FILE =
- S3A_SCALE_TEST + "csvfile";
- /**
- * Default path for the multi MB test file: {@value}.
- */
- public static final String DEFAULT_CSVTEST_FILE
- = "s3a://landsat-pds/scene_list.gz";
-
- /**
- * Endpoint for the S3 CSV/scale tests. This defaults to
- * being us-east.
- */
- public static final String KEY_CSVTEST_ENDPOINT =
- S3A_SCALE_TEST + "csvfile.endpoint";
-
- /**
- * Endpoint for the S3 CSV/scale tests. This defaults to
- * being us-east.
- */
- public static final String DEFAULT_CSVTEST_ENDPOINT =
- "s3.amazonaws.com";
-
- /**
- * The default number of operations to perform: {@value}.
- */
- public static final long DEFAULT_OPERATION_COUNT = 2005;
-
- /**
- * Default number of directories to create when performing
- * directory performance/scale tests.
- */
- public static final int DEFAULT_DIRECTORY_COUNT = 2;
+ public static final int _1KB = 1024;
+ public static final int _1MB = _1KB * _1KB;
protected S3AFileSystem fs;
@@ -120,6 +69,8 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
private Configuration conf;
+ private boolean enabled;
+
/**
* Configuration generator. May be overridden to inject
* some custom options.
@@ -137,11 +88,33 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
return conf;
}
+ /**
+ * Setup. This triggers creation of the configuration.
+ */
@Before
public void setUp() throws Exception {
- conf = createConfiguration();
+ demandCreateConfiguration();
LOG.debug("Scale test operation count = {}", getOperationCount());
- fs = S3ATestUtils.createTestFileSystem(conf);
+ // multipart purges are disabled on the scale tests
+ fs = createTestFileSystem(conf, false);
+ // check for the test being enabled
+ enabled = getTestPropertyBool(
+ getConf(),
+ KEY_SCALE_TESTS_ENABLED,
+ DEFAULT_SCALE_TESTS_ENABLED);
+ Assume.assumeTrue("Scale test disabled: to enable set property " +
+ KEY_SCALE_TESTS_ENABLED, enabled);
+ }
+
+ /**
+ * Create the configuration if it is not already set up.
+ * @return the configuration.
+ */
+ private synchronized Configuration demandCreateConfiguration() {
+ if (conf == null) {
+ conf = createConfiguration();
+ }
+ return conf;
}
@After
@@ -160,7 +133,27 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
}
/**
- * Describe a test in the logs
+ * Create the timeout for tests. Some large tests may need a larger value.
+ * @return the test timeout to use
+ */
+ protected Timeout createTestTimeout() {
+ demandCreateConfiguration();
+ return new Timeout(
+ getTestTimeoutSeconds() * 1000);
+ }
+
+ /**
+ * Get the test timeout in seconds.
+ * @return the test timeout as set in system properties or the default.
+ */
+ protected static int getTestTimeoutSeconds() {
+ return getTestPropertyInt(null,
+ KEY_TEST_TIMEOUT,
+ DEFAULT_TEST_TIMEOUT);
+ }
+
+ /**
+ * Describe a test in the logs.
* @param text text to print
* @param args arguments to format in the printing
*/
@@ -189,4 +182,30 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
}
}
+ /**
+ * Get the gauge value of a statistic. Raises an assertion if
+ * there is no such gauge.
+ * @param statistic statistic to look up
+ * @return the value.
+ */
+ public long gaugeValue(Statistic statistic) {
+ S3AInstrumentation instrumentation = fs.getInstrumentation();
+ MutableGaugeLong gauge = instrumentation.lookupGauge(statistic.getSymbol());
+ assertNotNull("No gauge " + statistic
+ + " in " + instrumentation.dump("", " = ", "\n", true), gauge);
+ return gauge.value();
+ }
+
+ protected boolean isEnabled() {
+ return enabled;
+ }
+
+ /**
+ * Flag to indicate that this test is being used sequentially. This
+ * is used by some of the scale tests to validate test time expectations.
+ * @return true if the build indicates this test is being run in parallel.
+ */
+ protected boolean isParallelExecution() {
+ return Boolean.getBoolean(S3ATestConstants.KEY_PARALLEL_TEST_EXECUTION);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org