You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tm...@apache.org on 2018/09/01 22:44:10 UTC
hadoop git commit: HADOOP-15664. ABFS: Reduce test run time via
parallelization and grouping. Contributed by Da Zhou.
Repository: hadoop
Updated Branches:
refs/heads/HADOOP-15407 d58ea7c96 -> 1ba76bda0
HADOOP-15664. ABFS: Reduce test run time via parallelization and grouping.
Contributed by Da Zhou.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ba76bda
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ba76bda
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ba76bda
Branch: refs/heads/HADOOP-15407
Commit: 1ba76bda027d6fa201338a9c01e21666c6f94a5d
Parents: d58ea7c
Author: Thomas Marquardt <tm...@microsoft.com>
Authored: Sat Sep 1 20:39:34 2018 +0000
Committer: Thomas Marquardt <tm...@microsoft.com>
Committed: Sat Sep 1 21:21:45 2018 +0000
----------------------------------------------------------------------
hadoop-tools/hadoop-azure/pom.xml | 350 ++++++++++++++++++-
.../hadoop/fs/azurebfs/AzureBlobFileSystem.java | 8 +-
.../fs/azurebfs/services/AbfsOutputStream.java | 6 +
.../azure/ITestNativeFileSystemStatistics.java | 99 ++++++
.../fs/azure/NativeAzureFileSystemBaseTest.java | 80 +----
.../fs/azure/integration/AzureTestUtils.java | 53 ++-
.../ITestAzureBlobFileSystemE2EScale.java | 11 +-
.../ITestAzureBlobFileSystemFileStatus.java | 3 +
.../azurebfs/ITestAzureBlobFileSystemFlush.java | 167 +++++----
.../fs/azurebfs/ITestWasbAbfsCompatibility.java | 2 +-
10 files changed, 631 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml
index 7152f638..42f4d05 100644
--- a/hadoop-tools/hadoop-azure/pom.xml
+++ b/hadoop-tools/hadoop-azure/pom.xml
@@ -253,6 +253,351 @@
<profiles>
<profile>
+ <id>parallel-tests-wasb</id>
+ <activation>
+ <property>
+ <name>parallel-tests-wasb</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-parallel-tests-dirs</id>
+ <phase>test-compile</phase>
+ <configuration>
+ <target>
+ <script language="javascript"><![CDATA[
+ var baseDirs = [
+ project.getProperty("test.build.data"),
+ project.getProperty("test.build.dir"),
+ project.getProperty("hadoop.tmp.dir")
+ ];
+ for (var i in baseDirs) {
+ for (var j = 1; j <= ${testsThreadCount}; ++j) {
+ var mkdir = project.createTask("mkdir");
+ mkdir.setDir(new java.io.File(baseDirs[i], j));
+ mkdir.perform();
+ }
+ }
+ ]]></script>
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <forkCount>1</forkCount>
+ <forkCount>${testsThreadCount}</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <systemPropertyVariables>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+ <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+ <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/azure/Test*.java</include>
+ <include>**/azure/**/Test*.java</include>
+ </includes>
+ <excludes>
+ <exclude>**/azure/**/TestRollingWindowAverage*.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>serialized-test-wasb</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <forkCount>1</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <systemPropertyVariables>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+ <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+ <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/azure/**/TestRollingWindowAverage*.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-integration-test-wasb</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <forkCount>1</forkCount>
+ <forkCount>${testsThreadCount}</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <trimStackTrace>false</trimStackTrace>
+ <systemPropertyVariables>
+ <!-- Tell tests that they are being executed in parallel -->
+ <test.parallel.execution>true</test.parallel.execution>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+
+ <!-- Due to a Maven quirk, setting this to just -->
+ <!-- surefire.forkNumber won't do the parameter -->
+ <!-- substitution. Putting a prefix in front of it like -->
+ <!-- "fork-" makes it work. -->
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <!-- Propagate scale parameters -->
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+ <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+ <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+ </systemPropertyVariables>
+ <!-- Some tests cannot run in parallel-->
+ <includes>
+ <include>**/azure/ITest*.java</include>
+ <include>**/azure/**/ITest*.java</include>
+ </includes>
+ <excludes>
+ <exclude>**/azure/ITestNativeFileSystemStatistics.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <!-- Do a sequential run for tests that cannot handle -->
+ <!-- parallel execution. -->
+ <execution>
+ <id>sequential-integration-tests-wasb</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <trimStackTrace>false</trimStackTrace>
+ <systemPropertyVariables>
+ <test.parallel.execution>false</test.parallel.execution>
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+ <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+ <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/azure/ITestNativeFileSystemStatistics.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
+ <id>parallel-tests-abfs</id>
+ <activation>
+ <property>
+ <name>parallel-tests-abfs</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>create-parallel-tests-dirs</id>
+ <phase>test-compile</phase>
+ <configuration>
+ <target>
+ <script language="javascript"><![CDATA[
+ var baseDirs = [
+ project.getProperty("test.build.data"),
+ project.getProperty("test.build.dir"),
+ project.getProperty("hadoop.tmp.dir")
+ ];
+ for (var i in baseDirs) {
+ for (var j = 1; j <= ${testsThreadCount}; ++j) {
+ var mkdir = project.createTask("mkdir");
+ mkdir.setDir(new java.io.File(baseDirs[i], j));
+ mkdir.perform();
+ }
+ }
+ ]]></script>
+ </target>
+ </configuration>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>default-test</id>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <forkCount>${testsThreadCount}</forkCount>
+ <reuseForks>false</reuseForks>
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <systemPropertyVariables>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.huge.filesize>${fs.azure.scale.test.huge.filesize}</fs.azure.scale.test.huge.filesize>
+ <fs.azure.scale.test.huge.huge.partitionsize>${fs.azure.scale.test.huge.partitionsize}</fs.azure.scale.test.huge.huge.partitionsize>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ <fs.azure.scale.test.list.performance.threads>${fs.azure.scale.test.list.performance.threads}</fs.azure.scale.test.list.performance.threads>
+ <fs.azure.scale.test.list.performance.files>${fs.azure.scale.test.list.performance.files}</fs.azure.scale.test.list.performance.files>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/azurebfs/Test*.java</include>
+ <include>**/azurebfs/**/Test*.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>integration-test-abfs-parallel-classesAndMethods</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <forkCount>${testsThreadCount}</forkCount>
+ <reuseForks>true</reuseForks>
+ <parallel>both</parallel>
+ <threadCount>${testsThreadCount}</threadCount>
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <trimStackTrace>false</trimStackTrace>
+ <systemPropertyVariables>
+ <!-- Tell tests that they are being executed in parallel -->
+ <test.parallel.execution>true</test.parallel.execution>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+ <!-- Due to a Maven quirk, setting this to just -->
+ <!-- surefire.forkNumber won't do the parameter -->
+ <!-- substitution. Putting a prefix in front of it like -->
+ <!-- "fork-" makes it work. -->
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <!-- Propagate scale parameters -->
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ </systemPropertyVariables>
+
+ <includes>
+ <include>**/azurebfs/ITest*.java</include>
+ <include>**/azurebfs/**/ITest*.java</include>
+ </includes>
+ <excludes>
+ <exclude>**/azurebfs/contract/ITest*.java</exclude>
+ <exclude>**/azurebfs/ITestAzureBlobFileSystemE2EScale.java</exclude>
+ <exclude>**/azurebfs/ITestAbfsReadWriteAndSeek.java</exclude>
+ <exclude>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</exclude>
+ </excludes>
+
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-test-abfs-parallel-classes</id>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ <configuration>
+ <forkCount>${testsThreadCount}</forkCount>
+ <reuseForks>false</reuseForks>
+ <!--NOTICE: hadoop contract tests methods can not be ran in parallel-->
+ <argLine>${maven-surefire-plugin.argLine} -DminiClusterDedicatedDirs=true</argLine>
+ <forkedProcessTimeoutInSeconds>${fs.azure.scale.test.timeout}</forkedProcessTimeoutInSeconds>
+ <trimStackTrace>false</trimStackTrace>
+ <systemPropertyVariables>
+ <!-- Tell tests that they are being executed in parallel -->
+ <test.parallel.execution>true</test.parallel.execution>
+ <test.build.data>${test.build.data}/${surefire.forkNumber}</test.build.data>
+ <test.build.dir>${test.build.dir}/${surefire.forkNumber}</test.build.dir>
+ <hadoop.tmp.dir>${hadoop.tmp.dir}/${surefire.forkNumber}</hadoop.tmp.dir>
+
+ <!-- Due to a Maven quirk, setting this to just -->
+ <!-- surefire.forkNumber won't do the parameter -->
+ <!-- substitution. Putting a prefix in front of it like -->
+ <!-- "fork-" makes it work. -->
+ <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+ <!-- Propagate scale parameters -->
+ <fs.azure.scale.test.enabled>${fs.azure.scale.test.enabled}</fs.azure.scale.test.enabled>
+ <fs.azure.scale.test.timeout>${fs.azure.scale.test.timeout}</fs.azure.scale.test.timeout>
+ </systemPropertyVariables>
+ <includes>
+ <include>**/azurebfs/contract/ITest*.java</include>
+ <include>**/azurebfs/ITestAzureBlobFileSystemE2EScale.java</include>
+ <include>**/azurebfs/ITestAbfsReadWriteAndSeek.java</include>
+ <include>**/azurebfs/ITestAzureBlobFileSystemListStatus.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
+ <profile>
<id>parallel-tests</id>
<activation>
<property>
@@ -417,6 +762,7 @@
<exclude>**/ITestWasbRemoteCallHelper.java</exclude>
<exclude>**/ITestBlockBlobInputStream.java</exclude>
<exclude>**/ITestWasbAbfsCompatibility.java</exclude>
+ <exclude>**/ITestNativeFileSystemStatistics.java</exclude>
</excludes>
</configuration>
</execution>
@@ -452,6 +798,7 @@
<include>**/ITestAzureBlobFileSystemRandomRead.java</include>
<include>**/ITestWasbRemoteCallHelper.java</include>
<include>**/ITestBlockBlobInputStream.java</include>
+ <include>**/ITestNativeFileSystemStatistics.java</include>
</includes>
</configuration>
</execution>
@@ -460,11 +807,12 @@
</plugins>
</build>
</profile>
+
<profile>
<id>sequential-tests</id>
<activation>
<property>
- <name>!parallel-tests</name>
+ <name>sequential-tests</name>
</property>
</activation>
<build>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
index 4bde9d8..b809192 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
@@ -107,7 +107,11 @@ public class AzureBlobFileSystem extends FileSystem {
if (abfsStore.getAbfsConfiguration().getCreateRemoteFileSystemDuringInitialization()) {
if (!this.fileSystemExists()) {
- this.createFileSystem();
+ try {
+ this.createFileSystem();
+ } catch (AzureBlobFileSystemException ex) {
+ checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS);
+ }
}
}
@@ -121,7 +125,7 @@ public class AzureBlobFileSystem extends FileSystem {
if (UserGroupInformation.isSecurityEnabled()) {
this.delegationTokenEnabled = abfsStore.getAbfsConfiguration().isDelegationTokenManagerEnabled();
- if(this.delegationTokenEnabled) {
+ if (this.delegationTokenEnabled) {
LOG.debug("Initializing DelegationTokenManager for {}", uri);
this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index 92e081e..7e43090 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FSExceptionMessages;
@@ -369,4 +370,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
this.length = length;
}
}
+
+ @VisibleForTesting
+ public synchronized void waitForPendingUploads() throws IOException {
+ waitForTaskToComplete();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
new file mode 100644
index 0000000..cbb09dd
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/ITestNativeFileSystemStatistics.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azure;
+
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.junit.Assume.assumeNotNull;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.cleanupTestAccount;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFromFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+/**
+ * Because FileSystem.Statistics is per FileSystem, so statistics can not be ran in
+ * parallel, hence in this test file, force them to run in sequential.
+ * */
+public class ITestNativeFileSystemStatistics extends AbstractWasbTestWithTimeout{
+
+ @Test
+ public void test_001_NativeAzureFileSystemMocked() throws Exception {
+ AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount.createMock();
+ assumeNotNull(testAccount);
+ testStatisticsWithAccount(testAccount);
+ }
+
+ @Test
+ public void test_002_NativeAzureFileSystemPageBlobLive() throws Exception {
+ Configuration conf = new Configuration();
+ // Configure the page blob directories key so every file created is a page blob.
+ conf.set(AzureNativeFileSystemStore.KEY_PAGE_BLOB_DIRECTORIES, "/");
+
+ // Configure the atomic rename directories key so every folder will have
+ // atomic rename applied.
+ conf.set(AzureNativeFileSystemStore.KEY_ATOMIC_RENAME_DIRECTORIES, "/");
+ AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount.create(conf);
+ assumeNotNull(testAccount);
+ testStatisticsWithAccount(testAccount);
+ }
+
+ @Test
+ public void test_003_NativeAzureFileSystem() throws Exception {
+ AzureBlobStorageTestAccount testAccount = AzureBlobStorageTestAccount.create();
+ assumeNotNull(testAccount);
+ testStatisticsWithAccount(testAccount);
+ }
+
+ private void testStatisticsWithAccount(AzureBlobStorageTestAccount testAccount) throws Exception {
+ assumeNotNull(testAccount);
+ NativeAzureFileSystem fs = testAccount.getFileSystem();
+ testStatistics(fs);
+ cleanupTestAccount(testAccount);
+ }
+
+ /**
+ * When tests are ran in parallel, this tests will fail because
+ * FileSystem.Statistics is per FileSystem class.
+ */
+ @SuppressWarnings("deprecation")
+ private void testStatistics(NativeAzureFileSystem fs) throws Exception {
+ FileSystem.clearStatistics();
+ FileSystem.Statistics stats = FileSystem.getStatistics("wasb",
+ NativeAzureFileSystem.class);
+ assertEquals(0, stats.getBytesRead());
+ assertEquals(0, stats.getBytesWritten());
+ Path newFile = new Path("testStats");
+ writeStringToFile(fs, newFile, "12345678");
+ assertEquals(8, stats.getBytesWritten());
+ assertEquals(0, stats.getBytesRead());
+ String readBack = readStringFromFile(fs, newFile);
+ assertEquals("12345678", readBack);
+ assertEquals(8, stats.getBytesRead());
+ assertEquals(8, stats.getBytesWritten());
+ assertTrue(fs.delete(newFile, true));
+ assertEquals(8, stats.getBytesRead());
+ assertEquals(8, stats.getBytesWritten());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
index 726b504..19d370e 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/NativeAzureFileSystemBaseTest.java
@@ -18,14 +18,10 @@
package org.apache.hadoop.fs.azure;
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.InputStreamReader;
import java.io.OutputStream;
-import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
@@ -51,6 +47,9 @@ import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.readStringFromFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToFile;
+import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.writeStringToStream;
import static org.apache.hadoop.test.GenericTestUtils.*;
/*
@@ -329,12 +328,12 @@ public abstract class NativeAzureFileSystemBaseTest
FileSystem localFs = FileSystem.get(new Configuration());
localFs.delete(localFilePath, true);
try {
- writeString(localFs, localFilePath, "Testing");
+ writeStringToFile(localFs, localFilePath, "Testing");
Path dstPath = methodPath();
assertTrue(FileUtil.copy(localFs, localFilePath, fs, dstPath, false,
fs.getConf()));
assertPathExists("coied from local", dstPath);
- assertEquals("Testing", readString(fs, dstPath));
+ assertEquals("Testing", readStringFromFile(fs, dstPath));
fs.delete(dstPath, true);
} finally {
localFs.delete(localFilePath, true);
@@ -364,26 +363,6 @@ public abstract class NativeAzureFileSystemBaseTest
}
@Test
- public void testStatistics() throws Exception {
- FileSystem.clearStatistics();
- FileSystem.Statistics stats = FileSystem.getStatistics("wasb",
- NativeAzureFileSystem.class);
- assertEquals(0, stats.getBytesRead());
- assertEquals(0, stats.getBytesWritten());
- Path newFile = new Path("testStats");
- writeString(newFile, "12345678");
- assertEquals(8, stats.getBytesWritten());
- assertEquals(0, stats.getBytesRead());
- String readBack = readString(newFile);
- assertEquals("12345678", readBack);
- assertEquals(8, stats.getBytesRead());
- assertEquals(8, stats.getBytesWritten());
- assertTrue(fs.delete(newFile, true));
- assertEquals(8, stats.getBytesRead());
- assertEquals(8, stats.getBytesWritten());
- }
-
- @Test
public void testUriEncoding() throws Exception {
fs.create(new Path("p/t%5Fe")).close();
FileStatus[] listing = fs.listStatus(new Path("p"));
@@ -767,7 +746,7 @@ public abstract class NativeAzureFileSystemBaseTest
Path renamePendingFile = new Path(renamePendingStr);
FSDataOutputStream out = fs.create(renamePendingFile, true);
assertTrue(out != null);
- writeString(out, renameDescription);
+ writeStringToStream(out, renameDescription);
// Redo the rename operation based on the contents of the -RenamePending.json file.
// Trigger the redo by checking for existence of the original folder. It must appear
@@ -831,7 +810,7 @@ public abstract class NativeAzureFileSystemBaseTest
Path renamePendingFile = new Path(renamePendingStr);
FSDataOutputStream out = fs.create(renamePendingFile, true);
assertTrue(out != null);
- writeString(out, pending.makeRenamePendingFileContents());
+ writeStringToStream(out, pending.makeRenamePendingFileContents());
// Redo the rename operation based on the contents of the
// -RenamePending.json file. Trigger the redo by checking for existence of
@@ -886,7 +865,7 @@ public abstract class NativeAzureFileSystemBaseTest
Path renamePendingFile = new Path(renamePendingStr);
FSDataOutputStream out = fs.create(renamePendingFile, true);
assertTrue(out != null);
- writeString(out, pending.makeRenamePendingFileContents());
+ writeStringToStream(out, pending.makeRenamePendingFileContents());
// Rename inner folder to simulate the scenario where rename has started and
// only one directory has been renamed but not the files under it
@@ -1000,7 +979,7 @@ public abstract class NativeAzureFileSystemBaseTest
Path renamePendingFile = new Path(renamePendingStr);
FSDataOutputStream out = fs.create(renamePendingFile, true);
assertTrue(out != null);
- writeString(out, pending.makeRenamePendingFileContents());
+ writeStringToStream(out, pending.makeRenamePendingFileContents());
try {
pending.redo();
@@ -1228,7 +1207,7 @@ public abstract class NativeAzureFileSystemBaseTest
Path renamePendingFile = new Path(renamePendingStr);
FSDataOutputStream out = fs.create(renamePendingFile, true);
assertTrue(out != null);
- writeString(out, renameDescription);
+ writeStringToStream(out, renameDescription);
}
// set whether a child is present or not
@@ -1488,7 +1467,7 @@ public abstract class NativeAzureFileSystemBaseTest
Calendar utc = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
long currentUtcTime = utc.getTime().getTime();
FileStatus fileStatus = fs.getFileStatus(testPath);
- final long errorMargin = 10 * 1000; // Give it +/-10 seconds
+ final long errorMargin = 60 * 1000; // Give it +/-60 seconds
assertTrue("Modification time " +
new Date(fileStatus.getModificationTime()) + " is not close to now: " +
utc.getTime(),
@@ -1504,45 +1483,12 @@ public abstract class NativeAzureFileSystemBaseTest
}
private String readString(Path testFile) throws IOException {
- return readString(fs, testFile);
+ return readStringFromFile(fs, testFile);
}
- private String readString(FileSystem fs, Path testFile) throws IOException {
- FSDataInputStream inputStream = fs.open(testFile);
- String ret = readString(inputStream);
- inputStream.close();
- return ret;
- }
-
- private String readString(FSDataInputStream inputStream) throws IOException {
- BufferedReader reader = new BufferedReader(new InputStreamReader(
- inputStream));
- final int BUFFER_SIZE = 1024;
- char[] buffer = new char[BUFFER_SIZE];
- int count = reader.read(buffer, 0, BUFFER_SIZE);
- if (count > BUFFER_SIZE) {
- throw new IOException("Exceeded buffer size");
- }
- inputStream.close();
- return new String(buffer, 0, count);
- }
private void writeString(Path path, String value) throws IOException {
- writeString(fs, path, value);
- }
-
- private void writeString(FileSystem fs, Path path, String value)
- throws IOException {
- FSDataOutputStream outputStream = fs.create(path, true);
- writeString(outputStream, value);
- }
-
- private void writeString(FSDataOutputStream outputStream, String value)
- throws IOException {
- BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
- outputStream));
- writer.write(value);
- writer.close();
+ writeStringToFile(fs, path, value);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
index b438c8e..c46320a 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/integration/AzureTestUtils.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.fs.azure.integration;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
import java.net.URI;
import java.util.List;
@@ -30,12 +34,15 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
+import static org.junit.Assume.assumeTrue;
import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_ACCOUNT_NAME_DOMAIN_SUFFIX_REGEX;
import static org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount.WASB_TEST_ACCOUNT_NAME_WITH_DOMAIN;
@@ -43,7 +50,6 @@ import static org.apache.hadoop.fs.azure.integration.AzureTestConstants.*;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assume.assumeTrue;
/**
* Utilities for the Azure tests. Based on {@code S3ATestUtils}, so
@@ -494,4 +500,49 @@ public final class AzureTestUtils extends Assert {
return accountName;
}
+ /**
+ * Write string into a file.
+ */
+ public static void writeStringToFile(FileSystem fs, Path path, String value)
+ throws IOException {
+ FSDataOutputStream outputStream = fs.create(path, true);
+ writeStringToStream(outputStream, value);
+ }
+
+ /**
+ * Write string into a file.
+ */
+ public static void writeStringToStream(FSDataOutputStream outputStream, String value)
+ throws IOException {
+ BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ outputStream));
+ writer.write(value);
+ writer.close();
+ }
+
+ /**
+ * Read string from a file.
+ */
+ public static String readStringFromFile(FileSystem fs, Path testFile) throws IOException {
+ FSDataInputStream inputStream = fs.open(testFile);
+ String ret = readStringFromStream(inputStream);
+ inputStream.close();
+ return ret;
+ }
+
+ /**
+ * Read string from stream.
+ */
+ public static String readStringFromStream(FSDataInputStream inputStream) throws IOException {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(
+ inputStream));
+ final int BUFFER_SIZE = 1024;
+ char[] buffer = new char[BUFFER_SIZE];
+ int count = reader.read(buffer, 0, BUFFER_SIZE);
+ if (count > BUFFER_SIZE) {
+ throw new IOException("Exceeded buffer size");
+ }
+ inputStream.close();
+ return new String(buffer, 0, count);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
index 522b635..7ed9d42 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2EScale.java
@@ -44,7 +44,6 @@ public class ITestAzureBlobFileSystemE2EScale extends
private static final int BASE_SIZE = 1024;
private static final int ONE_MB = 1024 * 1024;
private static final int DEFAULT_WRITE_TIMES = 100;
- private static final Path TEST_FILE = new Path("ITestAzureBlobFileSystemE2EScale");
public ITestAzureBlobFileSystemE2EScale() {
}
@@ -52,7 +51,8 @@ public class ITestAzureBlobFileSystemE2EScale extends
@Test
public void testWriteHeavyBytesToFileAcrossThreads() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
- final FSDataOutputStream stream = fs.create(TEST_FILE);
+ final Path testFile = path(methodName.getMethodName());
+ final FSDataOutputStream stream = fs.create(testFile);
ExecutorService es = Executors.newFixedThreadPool(TEN);
int testWriteBufferSize = 2 * TEN * ONE_THOUSAND * BASE_SIZE;
@@ -81,7 +81,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
stream.close();
es.shutdownNow();
- FileStatus fileStatus = fs.getFileStatus(TEST_FILE);
+ FileStatus fileStatus = fs.getFileStatus(testFile);
assertEquals(testWriteBufferSize * operationCount, fileStatus.getLen());
}
@@ -89,9 +89,10 @@ public class ITestAzureBlobFileSystemE2EScale extends
public void testReadWriteHeavyBytesToFileWithStatistics() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final FileSystem.Statistics abfsStatistics;
+ final Path testFile = path(methodName.getMethodName());
int testBufferSize;
final byte[] sourceData;
- try (FSDataOutputStream stream = fs.create(TEST_FILE)) {
+ try (FSDataOutputStream stream = fs.create(testFile)) {
abfsStatistics = fs.getFsStatistics();
abfsStatistics.reset();
@@ -103,7 +104,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
final byte[] remoteData = new byte[testBufferSize];
int bytesRead;
- try (FSDataInputStream inputStream = fs.open(TEST_FILE, 4 * ONE_MB)) {
+ try (FSDataInputStream inputStream = fs.open(testFile, 4 * ONE_MB)) {
bytesRead = inputStream.read(remoteData);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
index 88f77b0..dba10f5 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.junit.Ignore;
import org.junit.Test;
import org.apache.hadoop.fs.FileStatus;
@@ -53,6 +54,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
assertEquals("root listing", 0, rootls.length);
}
+ @Ignore("When running against live abfs with Oauth account, this test will fail. Need to check the tenant.")
@Test
public void testFileStatusPermissionsAndOwnerAndGroup() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
@@ -86,6 +88,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
return fileStatus;
}
+ @Ignore("When running against live abfs with Oauth account, this test will fail. Need to check the tenant.")
@Test
public void testFolderStatusPermissionsAndOwnerAndGroup() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 7c6bbb5..337f95c 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -18,20 +18,19 @@
package org.apache.hadoop.fs.azurebfs;
+import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
-import java.util.EnumSet;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.io.IOException;
-import com.microsoft.azure.storage.blob.BlockEntry;
-import com.microsoft.azure.storage.blob.BlockListingFilter;
-import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsNot;
import org.junit.Assume;
@@ -43,11 +42,12 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
/**
* Test flush operation.
+ * This class cannot be run in parallel test mode--check comments in
+ * testWriteHeavyBytesToFileSyncFlush().
*/
public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int BASE_SIZE = 1024;
@@ -55,11 +55,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
private static final int TEST_BUFFER_SIZE = 5 * ONE_THOUSAND * BASE_SIZE;
private static final int ONE_MB = 1024 * 1024;
private static final int FLUSH_TIMES = 200;
- private static final int THREAD_SLEEP_TIME = 6000;
+ private static final int THREAD_SLEEP_TIME = 1000;
- private static final Path TEST_FILE_PATH = new Path("/testfile");
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
- private static final int WAITING_TIME = 4000;
+ private static final int WAITING_TIME = 1000;
public ITestAzureBlobFileSystemFlush() {
super();
@@ -68,8 +67,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
@Test
public void testAbfsOutputStreamAsyncFlushWithRetainUncommittedData() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
+ final Path testFilePath = path(methodName.getMethodName());
final byte[] b;
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
@@ -84,7 +84,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
}
final byte[] r = new byte[TEST_BUFFER_SIZE];
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
+ try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
while (inputStream.available() != 0) {
int result = inputStream.read(r);
@@ -97,8 +97,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
@Test
public void testAbfsOutputStreamSyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
+ final Path testFilePath = path(methodName.getMethodName());
+
final byte[] b;
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
stream.write(b);
@@ -111,7 +113,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
}
final byte[] r = new byte[TEST_BUFFER_SIZE];
- try (FSDataInputStream inputStream = fs.open(TEST_FILE_PATH, 4 * ONE_MB)) {
+ try (FSDataInputStream inputStream = fs.open(testFilePath, 4 * ONE_MB)) {
int result = inputStream.read(r);
assertNotEquals(-1, result);
@@ -123,12 +125,9 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
@Test
public void testWriteHeavyBytesToFileSyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
- final FileSystem.Statistics abfsStatistics;
+ final Path testFilePath = path(methodName.getMethodName());
ExecutorService es;
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
- abfsStatistics = fs.getFsStatistics();
- abfsStatistics.reset();
-
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
es = Executors.newFixedThreadPool(10);
final byte[] b = new byte[TEST_BUFFER_SIZE];
@@ -163,18 +162,18 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
}
es.shutdownNow();
- FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ FileStatus fileStatus = fs.getFileStatus(testFilePath);
long expectedWrites = (long) TEST_BUFFER_SIZE * FLUSH_TIMES;
- assertEquals("Wrong file length in " + fileStatus, expectedWrites, fileStatus.getLen());
- assertEquals("wrong bytes Written count in " + abfsStatistics,
- expectedWrites, abfsStatistics.getBytesWritten());
+ assertEquals("Wrong file length in " + testFilePath, expectedWrites, fileStatus.getLen());
}
@Test
public void testWriteHeavyBytesToFileAsyncFlush() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
ExecutorService es = Executors.newFixedThreadPool(10);
- try (FSDataOutputStream stream = fs.create(TEST_FILE_PATH)) {
+
+ final Path testFilePath = path(methodName.getMethodName());
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
final byte[] b = new byte[TEST_BUFFER_SIZE];
new Random().nextBytes(b);
@@ -207,54 +206,50 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
}
es.shutdownNow();
- FileStatus fileStatus = fs.getFileStatus(TEST_FILE_PATH);
+ FileStatus fileStatus = fs.getFileStatus(testFilePath);
assertEquals((long) TEST_BUFFER_SIZE * FLUSH_TIMES, fileStatus.getLen());
}
@Test
public void testFlushWithFlushEnabled() throws Exception {
- Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
-
- AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
- String wasbUrl = testAccount.getFileSystem().getName();
- String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
- final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
- // test only valid for non-namespace enabled account
- Assume.assumeFalse(fs.getIsNamespaceEnabeld());
-
- byte[] buffer = getRandomBytesArray();
- CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
- // Wait for write request to be executed
- Thread.sleep(WAITING_TIME);
- stream.flush();
- ArrayList<BlockEntry> blockList = blob.downloadBlockList(
- BlockListingFilter.COMMITTED, null, null, null);
- // verify block has been committed
- assertEquals(1, blockList.size());
- }
+ testFlush(true);
}
@Test
public void testFlushWithFlushDisabled() throws Exception {
+ testFlush(false);
+ }
+
+ private void testFlush(boolean flushEnabled) throws Exception {
Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
- AzureBlobStorageTestAccount testAccount = createWasbTestAccount();
- String wasbUrl = testAccount.getFileSystem().getName();
- String abfsUrl = wasbUrlToAbfsUrl(wasbUrl);
- final AzureBlobFileSystem fs = this.getFileSystem(abfsUrl);
- // test only valid for non-namespace enabled account
- Assume.assumeFalse(fs.getIsNamespaceEnabeld());
+ final AzureBlobFileSystem fs = (AzureBlobFileSystem) getFileSystem();
+
+ // Simulate setting "fs.azure.enable.flush" to true or false
+ fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(flushEnabled);
+
+ final Path testFilePath = path(methodName.getMethodName());
byte[] buffer = getRandomBytesArray();
- CloudBlockBlob blob = testAccount.getBlobReference(TEST_FILE_PATH.toString().substring(1));
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
- // Wait for write request to be executed
- Thread.sleep(WAITING_TIME);
+
+ // The test case must write "fs.azure.write.request.size" bytes
+ // to the stream in order for the data to be uploaded to storage.
+ assertEquals(
+ fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize(),
+ buffer.length);
+
+ try (FSDataOutputStream stream = fs.create(testFilePath)) {
+ stream.write(buffer);
+
+ // Write asynchronously uploads data, so we must wait for completion
+ AbfsOutputStream abfsStream = (AbfsOutputStream) stream.getWrappedStream();
+ abfsStream.waitForPendingUploads();
+
+ // Flush commits the data so it can be read.
stream.flush();
- ArrayList<BlockEntry> blockList = blob.downloadBlockList(
- BlockListingFilter.COMMITTED, null, null, null);
- // verify block has not been committed
- assertEquals(0, blockList.size());
+
+ // Verify that the data can be read if flushEnabled is true; and otherwise
+ // cannot be read.
+ validate(fs.open(testFilePath), buffer, flushEnabled);
}
}
@@ -262,9 +257,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testHflushWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+ String fileName = UUID.randomUUID().toString();
+ final Path testFilePath = path(fileName);
+
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
stream.hflush();
- validate(fs, TEST_FILE_PATH, buffer, true);
+ validate(fs, testFilePath, buffer, true);
}
}
@@ -272,9 +270,11 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+ final Path testFilePath = path(methodName.getMethodName());
+
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hflush();
- validate(fs, TEST_FILE_PATH, buffer, false);
+ validate(fs, testFilePath, buffer, false);
}
}
@@ -282,9 +282,12 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testHsyncWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+
+ final Path testFilePath = path(methodName.getMethodName());
+
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
stream.hsync();
- validate(fs, TEST_FILE_PATH, buffer, true);
+ validate(fs, testFilePath, buffer, true);
}
}
@@ -292,7 +295,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+
+ final Path testFilePath = path(methodName.getMethodName());
+
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
assertFalse(stream.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
@@ -305,7 +311,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, true)) {
+ final Path testFilePath = path(methodName.getMethodName());
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, true)) {
assertTrue(stream.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(stream.hasCapability(StreamCapabilities.HSYNC));
assertFalse(stream.hasCapability(StreamCapabilities.DROPBEHIND));
@@ -318,9 +325,10 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
byte[] buffer = getRandomBytesArray();
- try (FSDataOutputStream stream = getStreamAfterWrite(fs, TEST_FILE_PATH, buffer, false)) {
+ final Path testFilePath = path(methodName.getMethodName());
+ try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, buffer, false)) {
stream.hsync();
- validate(fs, TEST_FILE_PATH, buffer, false);
+ validate(fs, testFilePath, buffer, false);
}
}
@@ -337,11 +345,28 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
return stream;
}
- private AzureBlobStorageTestAccount createWasbTestAccount() throws Exception {
- return AzureBlobStorageTestAccount.create("", EnumSet.of(AzureBlobStorageTestAccount.CreateOptions.CreateContainer),
- this.getConfiguration());
- }
+ private void validate(InputStream stream, byte[] writeBuffer, boolean isEqual)
+ throws IOException {
+ try {
+ byte[] readBuffer = new byte[writeBuffer.length];
+ int numBytesRead = stream.read(readBuffer, 0, readBuffer.length);
+
+ if (isEqual) {
+ assertArrayEquals(
+ "Bytes read do not match bytes written.",
+ writeBuffer,
+ readBuffer);
+ } else {
+ assertThat(
+ "Bytes read unexpectedly match bytes written.",
+ readBuffer,
+ IsNot.not(IsEqual.equalTo(writeBuffer)));
+ }
+ } finally {
+ stream.close();
+ }
+ }
private void validate(FileSystem fs, Path path, byte[] writeBuffer, boolean isEqual) throws IOException {
String filePath = path.toUri().toString();
try (FSDataInputStream inputStream = fs.open(path)) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba76bda/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
index c4bfee2..33a5805 100644
--- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
+++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java
@@ -98,7 +98,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest {
NativeAzureFileSystem wasb = getWasbFileSystem();
for (int i = 0; i< 4; i++) {
- Path path = new Path("/testfiles/~12/!008/testfile" + i);
+ Path path = new Path("/testReadFile/~12/!008/testfile" + i);
final FileSystem createFs = createFileWithAbfs[i] ? abfs : wasb;
// Write
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org