You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2016/10/03 22:58:41 UTC
[28/57] [abbrv] hadoop git commit: Revert "HADOOP-13584.
hdoop-aliyun: merge HADOOP-12756 branch back" This reverts commit
5707f88d8550346f167e45c2f8c4161eb3957e3a
Revert "HADOOP-13584. hdoop-aliyun: merge HADOOP-12756 branch back"
This reverts commit 5707f88d8550346f167e45c2f8c4161eb3957e3a
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1443988
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1443988
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1443988
Branch: refs/heads/HDFS-10467
Commit: d1443988f809fe6656f60dfed4ee4e0f4844ee5c
Parents: 9a44a83
Author: Kai Zheng <ka...@intel.com>
Authored: Thu Sep 29 09:18:27 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Thu Sep 29 09:18:27 2016 +0800
----------------------------------------------------------------------
.gitignore | 2 -
hadoop-project/pom.xml | 22 -
.../dev-support/findbugs-exclude.xml | 18 -
hadoop-tools/hadoop-aliyun/pom.xml | 154 ------
.../aliyun/oss/AliyunCredentialsProvider.java | 87 ---
.../fs/aliyun/oss/AliyunOSSFileSystem.java | 543 -------------------
.../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 516 ------------------
.../fs/aliyun/oss/AliyunOSSInputStream.java | 260 ---------
.../fs/aliyun/oss/AliyunOSSOutputStream.java | 111 ----
.../hadoop/fs/aliyun/oss/AliyunOSSUtils.java | 167 ------
.../apache/hadoop/fs/aliyun/oss/Constants.java | 113 ----
.../hadoop/fs/aliyun/oss/package-info.java | 22 -
.../site/markdown/tools/hadoop-aliyun/index.md | 294 ----------
.../fs/aliyun/oss/AliyunOSSTestUtils.java | 77 ---
.../fs/aliyun/oss/TestAliyunCredentials.java | 78 ---
.../oss/TestAliyunOSSFileSystemContract.java | 239 --------
.../oss/TestAliyunOSSFileSystemStore.java | 125 -----
.../fs/aliyun/oss/TestAliyunOSSInputStream.java | 145 -----
.../aliyun/oss/TestAliyunOSSOutputStream.java | 91 ----
.../aliyun/oss/contract/AliyunOSSContract.java | 49 --
.../contract/TestAliyunOSSContractCreate.java | 35 --
.../contract/TestAliyunOSSContractDelete.java | 34 --
.../contract/TestAliyunOSSContractDistCp.java | 44 --
.../TestAliyunOSSContractGetFileStatus.java | 35 --
.../contract/TestAliyunOSSContractMkdir.java | 34 --
.../oss/contract/TestAliyunOSSContractOpen.java | 34 --
.../contract/TestAliyunOSSContractRename.java | 35 --
.../contract/TestAliyunOSSContractRootDir.java | 69 ---
.../oss/contract/TestAliyunOSSContractSeek.java | 34 --
.../src/test/resources/contract/aliyun-oss.xml | 115 ----
.../src/test/resources/core-site.xml | 46 --
.../src/test/resources/log4j.properties | 23 -
hadoop-tools/hadoop-tools-dist/pom.xml | 6 -
hadoop-tools/pom.xml | 1 -
34 files changed, 3658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 194862b..a5d69d0 100644
--- a/.gitignore
+++ b/.gitignore
@@ -31,5 +31,3 @@ hadoop-tools/hadoop-aws/src/test/resources/auth-keys.xml
hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml
hadoop-tools/hadoop-azure/src/test/resources/azure-auth-keys.xml
patchprocess/
-hadoop-tools/hadoop-aliyun/src/test/resources/auth-keys.xml
-hadoop-tools/hadoop-aliyun/src/test/resources/contract-test-options.xml
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 49ea40f..d9a01a0 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -439,12 +439,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-aliyun</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-kms</artifactId>
<version>${project.version}</version>
<classifier>classes</classifier>
@@ -1011,22 +1005,6 @@
<version>4.2.0</version>
</dependency>
- <dependency>
- <groupId>com.aliyun.oss</groupId>
- <artifactId>aliyun-sdk-oss</artifactId>
- <version>2.2.1</version>
- <exclusions>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>xerces</groupId>
<artifactId>xercesImpl</artifactId>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
deleted file mode 100644
index 40d78d0..0000000
--- a/hadoop-tools/hadoop-aliyun/dev-support/findbugs-exclude.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<FindBugsFilter>
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/pom.xml b/hadoop-tools/hadoop-aliyun/pom.xml
deleted file mode 100644
index 358b18b..0000000
--- a/hadoop-tools/hadoop-aliyun/pom.xml
+++ /dev/null
@@ -1,154 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-project</artifactId>
- <version>3.0.0-alpha2-SNAPSHOT</version>
- <relativePath>../../hadoop-project</relativePath>
- </parent>
- <artifactId>hadoop-aliyun</artifactId>
- <name>Apache Hadoop Aliyun OSS support</name>
- <packaging>jar</packaging>
-
- <properties>
- <file.encoding>UTF-8</file.encoding>
- <downloadSources>true</downloadSources>
- </properties>
-
- <profiles>
- <profile>
- <id>tests-off</id>
- <activation>
- <file>
- <missing>src/test/resources/auth-keys.xml</missing>
- </file>
- </activation>
- <properties>
- <maven.test.skip>true</maven.test.skip>
- </properties>
- </profile>
- <profile>
- <id>tests-on</id>
- <activation>
- <file>
- <exists>src/test/resources/auth-keys.xml</exists>
- </file>
- </activation>
- <properties>
- <maven.test.skip>false</maven.test.skip>
- </properties>
- </profile>
- </profiles>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <findbugsXmlOutput>true</findbugsXmlOutput>
- <xmlOutput>true</xmlOutput>
- <excludeFilterFile>${basedir}/dev-support/findbugs-exclude.xml
- </excludeFilterFile>
- <effort>Max</effort>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
- <configuration>
- <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
- <dependencyLocationsEnabled>false</dependencyLocationsEnabled>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <forkedProcessTimeoutInSeconds>3600</forkedProcessTimeoutInSeconds>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>deplist</id>
- <phase>compile</phase>
- <goals>
- <goal>list</goal>
- </goals>
- <configuration>
- <!-- build a shellprofile -->
- <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.aliyun.oss</groupId>
- <artifactId>aliyun-sdk-oss</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>compile</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-distcp</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-distcp</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-server-tests</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
deleted file mode 100644
index b46c67a..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunCredentialsProvider.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import com.aliyun.oss.common.auth.Credentials;
-import com.aliyun.oss.common.auth.CredentialsProvider;
-import com.aliyun.oss.common.auth.DefaultCredentials;
-import com.aliyun.oss.common.auth.InvalidCredentialsException;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * Support session credentials for authenticating with Aliyun.
- */
-public class AliyunCredentialsProvider implements CredentialsProvider {
- private Credentials credentials = null;
-
- public AliyunCredentialsProvider(Configuration conf)
- throws IOException {
- String accessKeyId;
- String accessKeySecret;
- String securityToken;
- try {
- accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
- accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
- } catch (IOException e) {
- throw new InvalidCredentialsException(e);
- }
-
- try {
- securityToken = AliyunOSSUtils.getValueWithKey(conf, SECURITY_TOKEN);
- } catch (IOException e) {
- securityToken = null;
- }
-
- if (StringUtils.isEmpty(accessKeyId)
- || StringUtils.isEmpty(accessKeySecret)) {
- throw new InvalidCredentialsException(
- "AccessKeyId and AccessKeySecret should not be null or empty.");
- }
-
- if (StringUtils.isNotEmpty(securityToken)) {
- credentials = new DefaultCredentials(accessKeyId, accessKeySecret,
- securityToken);
- } else {
- credentials = new DefaultCredentials(accessKeyId, accessKeySecret);
- }
- }
-
- @Override
- public void setCredentials(Credentials creds) {
- if (creds == null) {
- throw new InvalidCredentialsException("Credentials should not be null.");
- }
-
- credentials = creds;
- }
-
- @Override
- public Credentials getCredentials() {
- if (credentials == null) {
- throw new InvalidCredentialsException("Invalid credentials");
- }
-
- return credentials;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
deleted file mode 100644
index 81e038d..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java
+++ /dev/null
@@ -1,543 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileAlreadyExistsException;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-
-import com.aliyun.oss.model.OSSObjectSummary;
-import com.aliyun.oss.model.ObjectListing;
-import com.aliyun.oss.model.ObjectMetadata;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * Implementation of {@link FileSystem} for <a href="https://oss.aliyun.com">
- * Aliyun OSS</a>, used to access OSS blob system in a filesystem style.
- */
-public class AliyunOSSFileSystem extends FileSystem {
- private static final Logger LOG =
- LoggerFactory.getLogger(AliyunOSSFileSystem.class);
- private URI uri;
- private Path workingDir;
- private AliyunOSSFileSystemStore store;
- private int maxKeys;
-
- @Override
- public FSDataOutputStream append(Path path, int bufferSize,
- Progressable progress) throws IOException {
- throw new IOException("Append is not supported!");
- }
-
- @Override
- public void close() throws IOException {
- try {
- store.close();
- } finally {
- super.close();
- }
- }
-
- @Override
- public FSDataOutputStream create(Path path, FsPermission permission,
- boolean overwrite, int bufferSize, short replication, long blockSize,
- Progressable progress) throws IOException {
- String key = pathToKey(path);
- FileStatus status = null;
-
- try {
- // get the status or throw a FNFE
- status = getFileStatus(path);
-
- // if the thread reaches here, there is something at the path
- if (status.isDirectory()) {
- // path references a directory
- throw new FileAlreadyExistsException(path + " is a directory");
- }
- if (!overwrite) {
- // path references a file and overwrite is disabled
- throw new FileAlreadyExistsException(path + " already exists");
- }
- LOG.debug("Overwriting file {}", path);
- } catch (FileNotFoundException e) {
- // this means the file is not found
- }
-
- return new FSDataOutputStream(new AliyunOSSOutputStream(getConf(),
- store, key, progress, statistics), (Statistics)(null));
- }
-
- @Override
- public boolean delete(Path path, boolean recursive) throws IOException {
- try {
- return innerDelete(getFileStatus(path), recursive);
- } catch (FileNotFoundException e) {
- LOG.debug("Couldn't delete {} - does not exist", path);
- return false;
- }
- }
-
- /**
- * Delete an object. See {@link #delete(Path, boolean)}.
- *
- * @param status fileStatus object
- * @param recursive if path is a directory and set to
- * true, the directory is deleted else throws an exception. In
- * case of a file the recursive can be set to either true or false.
- * @return true if delete is successful else false.
- * @throws IOException due to inability to delete a directory or file.
- */
- private boolean innerDelete(FileStatus status, boolean recursive)
- throws IOException {
- Path f = status.getPath();
- String key = pathToKey(f);
- if (status.isDirectory()) {
- if (!recursive) {
- FileStatus[] statuses = listStatus(status.getPath());
- // Check whether it is an empty directory or not
- if (statuses.length > 0) {
- throw new IOException("Cannot remove directory " + f +
- ": It is not empty!");
- } else {
- // Delete empty directory without '-r'
- key = AliyunOSSUtils.maybeAddTrailingSlash(key);
- store.deleteObject(key);
- }
- } else {
- store.deleteDirs(key);
- }
- } else {
- store.deleteObject(key);
- }
-
- createFakeDirectoryIfNecessary(f);
- return true;
- }
-
- private void createFakeDirectoryIfNecessary(Path f) throws IOException {
- String key = pathToKey(f);
- if (StringUtils.isNotEmpty(key) && !exists(f)) {
- LOG.debug("Creating new fake directory at {}", f);
- mkdir(pathToKey(f.getParent()));
- }
- }
-
- @Override
- public FileStatus getFileStatus(Path path) throws IOException {
- Path qualifiedPath = path.makeQualified(uri, workingDir);
- String key = pathToKey(qualifiedPath);
-
- // Root always exists
- if (key.length() == 0) {
- return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
- }
-
- ObjectMetadata meta = store.getObjectMetadata(key);
- // If key not found and key does not end with "/"
- if (meta == null && !key.endsWith("/")) {
- // In case of 'dir + "/"'
- key += "/";
- meta = store.getObjectMetadata(key);
- }
- if (meta == null) {
- ObjectListing listing = store.listObjects(key, 1, null, false);
- if (CollectionUtils.isNotEmpty(listing.getObjectSummaries()) ||
- CollectionUtils.isNotEmpty(listing.getCommonPrefixes())) {
- return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
- } else {
- throw new FileNotFoundException(path + ": No such file or directory!");
- }
- } else if (objectRepresentsDirectory(key, meta.getContentLength())) {
- return new FileStatus(0, true, 1, 0, 0, qualifiedPath);
- } else {
- return new FileStatus(meta.getContentLength(), false, 1,
- getDefaultBlockSize(path), meta.getLastModified().getTime(),
- qualifiedPath);
- }
- }
-
- @Override
- public String getScheme() {
- return "oss";
- }
-
- @Override
- public URI getUri() {
- return uri;
- }
-
- @Override
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- @Deprecated
- public long getDefaultBlockSize() {
- return getConf().getLong(FS_OSS_BLOCK_SIZE_KEY, FS_OSS_BLOCK_SIZE_DEFAULT);
- }
-
- @Override
- public String getCanonicalServiceName() {
- // Does not support Token
- return null;
- }
-
- /**
- * Initialize new FileSystem.
- *
- * @param name the uri of the file system, including host, port, etc.
- * @param conf configuration of the file system
- * @throws IOException IO problems
- */
- public void initialize(URI name, Configuration conf) throws IOException {
- super.initialize(name, conf);
-
- uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
- workingDir = new Path("/user",
- System.getProperty("user.name")).makeQualified(uri, null);
-
- store = new AliyunOSSFileSystemStore();
- store.initialize(name, conf, statistics);
- maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
- setConf(conf);
- }
-
- /**
- * Check if OSS object represents a directory.
- *
- * @param name object key
- * @param size object content length
- * @return true if object represents a directory
- */
- private boolean objectRepresentsDirectory(final String name,
- final long size) {
- return StringUtils.isNotEmpty(name) && name.endsWith("/") && size == 0L;
- }
-
- /**
- * Turn a path (relative or otherwise) into an OSS key.
- *
- * @param path the path of the file.
- * @return the key of the object that represents the file.
- */
- private String pathToKey(Path path) {
- if (!path.isAbsolute()) {
- path = new Path(workingDir, path);
- }
-
- return path.toUri().getPath().substring(1);
- }
-
- private Path keyToPath(String key) {
- return new Path("/" + key);
- }
-
- @Override
- public FileStatus[] listStatus(Path path) throws IOException {
- String key = pathToKey(path);
- if (LOG.isDebugEnabled()) {
- LOG.debug("List status for path: " + path);
- }
-
- final List<FileStatus> result = new ArrayList<FileStatus>();
- final FileStatus fileStatus = getFileStatus(path);
-
- if (fileStatus.isDirectory()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("listStatus: doing listObjects for directory " + key);
- }
-
- ObjectListing objects = store.listObjects(key, maxKeys, null, false);
- while (true) {
- statistics.incrementReadOps(1);
- for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
- String objKey = objectSummary.getKey();
- if (objKey.equals(key + "/")) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring: " + objKey);
- }
- continue;
- } else {
- Path keyPath = keyToPath(objectSummary.getKey())
- .makeQualified(uri, workingDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding: fi: " + keyPath);
- }
- result.add(new FileStatus(objectSummary.getSize(), false, 1,
- getDefaultBlockSize(keyPath),
- objectSummary.getLastModified().getTime(), keyPath));
- }
- }
-
- for (String prefix : objects.getCommonPrefixes()) {
- if (prefix.equals(key + "/")) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Ignoring: " + prefix);
- }
- continue;
- } else {
- Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding: rd: " + keyPath);
- }
- result.add(new FileStatus(0, true, 1, 0, 0, keyPath));
- }
- }
-
- if (objects.isTruncated()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("listStatus: list truncated - getting next batch");
- }
- String nextMarker = objects.getNextMarker();
- objects = store.listObjects(key, maxKeys, nextMarker, false);
- statistics.incrementReadOps(1);
- } else {
- break;
- }
- }
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding: rd (not a dir): " + path);
- }
- result.add(fileStatus);
- }
-
- return result.toArray(new FileStatus[result.size()]);
- }
-
- /**
- * Used to create an empty file that represents an empty directory.
- *
- * @param key directory path
- * @return true if directory is successfully created
- * @throws IOException
- */
- private boolean mkdir(final String key) throws IOException {
- String dirName = key;
- if (StringUtils.isNotEmpty(key)) {
- if (!key.endsWith("/")) {
- dirName += "/";
- }
- store.storeEmptyFile(dirName);
- }
- return true;
- }
-
- @Override
- public boolean mkdirs(Path path, FsPermission permission)
- throws IOException {
- try {
- FileStatus fileStatus = getFileStatus(path);
-
- if (fileStatus.isDirectory()) {
- return true;
- } else {
- throw new FileAlreadyExistsException("Path is a file: " + path);
- }
- } catch (FileNotFoundException e) {
- validatePath(path);
- String key = pathToKey(path);
- return mkdir(key);
- }
- }
-
- /**
- * Check whether the path is a valid path.
- *
- * @param path the path to be checked.
- * @throws IOException
- */
- private void validatePath(Path path) throws IOException {
- Path fPart = path.getParent();
- do {
- try {
- FileStatus fileStatus = getFileStatus(fPart);
- if (fileStatus.isDirectory()) {
- // If path exists and a directory, exit
- break;
- } else {
- throw new FileAlreadyExistsException(String.format(
- "Can't make directory for path '%s', it is a file.", fPart));
- }
- } catch (FileNotFoundException fnfe) {
- }
- fPart = fPart.getParent();
- } while (fPart != null);
- }
-
- @Override
- public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- final FileStatus fileStatus = getFileStatus(path);
- if (fileStatus.isDirectory()) {
- throw new FileNotFoundException("Can't open " + path +
- " because it is a directory");
- }
-
- return new FSDataInputStream(new AliyunOSSInputStream(getConf(), store,
- pathToKey(path), fileStatus.getLen(), statistics));
- }
-
- @Override
- public boolean rename(Path srcPath, Path dstPath) throws IOException {
- if (srcPath.isRoot()) {
- // Cannot rename root of file system
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot rename the root of a filesystem");
- }
- return false;
- }
- Path parent = dstPath.getParent();
- while (parent != null && !srcPath.equals(parent)) {
- parent = parent.getParent();
- }
- if (parent != null) {
- return false;
- }
- FileStatus srcStatus = getFileStatus(srcPath);
- FileStatus dstStatus;
- try {
- dstStatus = getFileStatus(dstPath);
- } catch (FileNotFoundException fnde) {
- dstStatus = null;
- }
- if (dstStatus == null) {
- // If dst doesn't exist, check whether dst dir exists or not
- dstStatus = getFileStatus(dstPath.getParent());
- if (!dstStatus.isDirectory()) {
- throw new IOException(String.format(
- "Failed to rename %s to %s, %s is a file", srcPath, dstPath,
- dstPath.getParent()));
- }
- } else {
- if (srcStatus.getPath().equals(dstStatus.getPath())) {
- return !srcStatus.isDirectory();
- } else if (dstStatus.isDirectory()) {
- // If dst is a directory
- dstPath = new Path(dstPath, srcPath.getName());
- FileStatus[] statuses;
- try {
- statuses = listStatus(dstPath);
- } catch (FileNotFoundException fnde) {
- statuses = null;
- }
- if (statuses != null && statuses.length > 0) {
- // If dst exists and not a directory / not empty
- throw new FileAlreadyExistsException(String.format(
- "Failed to rename %s to %s, file already exists or not empty!",
- srcPath, dstPath));
- }
- } else {
- // If dst is not a directory
- throw new FileAlreadyExistsException(String.format(
- "Failed to rename %s to %s, file already exists!", srcPath,
- dstPath));
- }
- }
- if (srcStatus.isDirectory()) {
- copyDirectory(srcPath, dstPath);
- } else {
- copyFile(srcPath, dstPath);
- }
-
- return srcPath.equals(dstPath) || delete(srcPath, true);
- }
-
- /**
- * Copy file from source path to destination path.
- * (the caller should make sure srcPath is a file and dstPath is valid)
- *
- * @param srcPath source path.
- * @param dstPath destination path.
- * @return true if file is successfully copied.
- */
- private boolean copyFile(Path srcPath, Path dstPath) {
- String srcKey = pathToKey(srcPath);
- String dstKey = pathToKey(dstPath);
- return store.copyFile(srcKey, dstKey);
- }
-
- /**
- * Copy a directory from source path to destination path.
- * (the caller should make sure srcPath is a directory, and dstPath is valid)
- *
- * @param srcPath source path.
- * @param dstPath destination path.
- * @return true if directory is successfully copied.
- */
- private boolean copyDirectory(Path srcPath, Path dstPath) throws IOException {
- String srcKey = AliyunOSSUtils
- .maybeAddTrailingSlash(pathToKey(srcPath));
- String dstKey = AliyunOSSUtils
- .maybeAddTrailingSlash(pathToKey(dstPath));
-
- if (dstKey.startsWith(srcKey)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot rename a directory to a subdirectory of self");
- }
- return false;
- }
-
- store.storeEmptyFile(dstKey);
- ObjectListing objects = store.listObjects(srcKey, maxKeys, null, true);
- statistics.incrementReadOps(1);
- // Copy files from src folder to dst
- while (true) {
- for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
- String newKey =
- dstKey.concat(objectSummary.getKey().substring(srcKey.length()));
- store.copyFile(objectSummary.getKey(), newKey);
- }
- if (objects.isTruncated()) {
- String nextMarker = objects.getNextMarker();
- objects = store.listObjects(srcKey, maxKeys, nextMarker, true);
- statistics.incrementReadOps(1);
- } else {
- break;
- }
- }
- return true;
- }
-
- @Override
- public void setWorkingDirectory(Path dir) {
- this.workingDir = dir;
- }
-
- public AliyunOSSFileSystemStore getStore() {
- return store;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
deleted file mode 100644
index 9792a78..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ /dev/null
@@ -1,516 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.fs.aliyun.oss;
-
-import com.aliyun.oss.ClientConfiguration;
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSSClient;
-import com.aliyun.oss.OSSException;
-import com.aliyun.oss.common.auth.CredentialsProvider;
-import com.aliyun.oss.common.comm.Protocol;
-import com.aliyun.oss.model.AbortMultipartUploadRequest;
-import com.aliyun.oss.model.CannedAccessControlList;
-import com.aliyun.oss.model.CompleteMultipartUploadRequest;
-import com.aliyun.oss.model.CompleteMultipartUploadResult;
-import com.aliyun.oss.model.CopyObjectResult;
-import com.aliyun.oss.model.DeleteObjectsRequest;
-import com.aliyun.oss.model.GetObjectRequest;
-import com.aliyun.oss.model.InitiateMultipartUploadRequest;
-import com.aliyun.oss.model.InitiateMultipartUploadResult;
-import com.aliyun.oss.model.ListObjectsRequest;
-import com.aliyun.oss.model.ObjectMetadata;
-import com.aliyun.oss.model.ObjectListing;
-import com.aliyun.oss.model.OSSObjectSummary;
-import com.aliyun.oss.model.PartETag;
-import com.aliyun.oss.model.PutObjectResult;
-import com.aliyun.oss.model.UploadPartCopyRequest;
-import com.aliyun.oss.model.UploadPartCopyResult;
-import com.aliyun.oss.model.UploadPartRequest;
-import com.aliyun.oss.model.UploadPartResult;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * Core implementation of Aliyun OSS Filesystem for Hadoop.
- * Provides the bridging logic between Hadoop's abstract filesystem and
- * Aliyun OSS.
- */
-public class AliyunOSSFileSystemStore {
- public static final Logger LOG =
- LoggerFactory.getLogger(AliyunOSSFileSystemStore.class);
- private FileSystem.Statistics statistics;
- private OSSClient ossClient;
- private String bucketName;
- private long uploadPartSize;
- private long multipartThreshold;
- private long partSize;
- private int maxKeys;
- private String serverSideEncryptionAlgorithm;
-
- public void initialize(URI uri, Configuration conf,
- FileSystem.Statistics stat) throws IOException {
- statistics = stat;
- ClientConfiguration clientConf = new ClientConfiguration();
- clientConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS_KEY,
- MAXIMUM_CONNECTIONS_DEFAULT));
- boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS_KEY,
- SECURE_CONNECTIONS_DEFAULT);
- clientConf.setProtocol(secureConnections ? Protocol.HTTPS : Protocol.HTTP);
- clientConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES_KEY,
- MAX_ERROR_RETRIES_DEFAULT));
- clientConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT_KEY,
- ESTABLISH_TIMEOUT_DEFAULT));
- clientConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT_KEY,
- SOCKET_TIMEOUT_DEFAULT));
-
- String proxyHost = conf.getTrimmed(PROXY_HOST_KEY, "");
- int proxyPort = conf.getInt(PROXY_PORT_KEY, -1);
- if (StringUtils.isNotEmpty(proxyHost)) {
- clientConf.setProxyHost(proxyHost);
- if (proxyPort >= 0) {
- clientConf.setProxyPort(proxyPort);
- } else {
- if (secureConnections) {
- LOG.warn("Proxy host set without port. Using HTTPS default 443");
- clientConf.setProxyPort(443);
- } else {
- LOG.warn("Proxy host set without port. Using HTTP default 80");
- clientConf.setProxyPort(80);
- }
- }
- String proxyUsername = conf.getTrimmed(PROXY_USERNAME_KEY);
- String proxyPassword = conf.getTrimmed(PROXY_PASSWORD_KEY);
- if ((proxyUsername == null) != (proxyPassword == null)) {
- String msg = "Proxy error: " + PROXY_USERNAME_KEY + " or " +
- PROXY_PASSWORD_KEY + " set without the other.";
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
- clientConf.setProxyUsername(proxyUsername);
- clientConf.setProxyPassword(proxyPassword);
- clientConf.setProxyDomain(conf.getTrimmed(PROXY_DOMAIN_KEY));
- clientConf.setProxyWorkstation(conf.getTrimmed(PROXY_WORKSTATION_KEY));
- } else if (proxyPort >= 0) {
- String msg = "Proxy error: " + PROXY_PORT_KEY + " set without " +
- PROXY_HOST_KEY;
- LOG.error(msg);
- throw new IllegalArgumentException(msg);
- }
-
- String endPoint = conf.getTrimmed(ENDPOINT_KEY, "");
- CredentialsProvider provider =
- AliyunOSSUtils.getCredentialsProvider(conf);
- ossClient = new OSSClient(endPoint, provider, clientConf);
- uploadPartSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
- MULTIPART_UPLOAD_SIZE_DEFAULT);
- multipartThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
- MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
- partSize = conf.getLong(MULTIPART_UPLOAD_SIZE_KEY,
- MULTIPART_UPLOAD_SIZE_DEFAULT);
- if (partSize < MIN_MULTIPART_UPLOAD_PART_SIZE) {
- partSize = MIN_MULTIPART_UPLOAD_PART_SIZE;
- }
- serverSideEncryptionAlgorithm =
- conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY, "");
-
- if (uploadPartSize < 5 * 1024 * 1024) {
- LOG.warn(MULTIPART_UPLOAD_SIZE_KEY + " must be at least 5 MB");
- uploadPartSize = 5 * 1024 * 1024;
- }
-
- if (multipartThreshold < 5 * 1024 * 1024) {
- LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be at least 5 MB");
- multipartThreshold = 5 * 1024 * 1024;
- }
-
- if (multipartThreshold > 1024 * 1024 * 1024) {
- LOG.warn(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY + " must be less than 1 GB");
- multipartThreshold = 1024 * 1024 * 1024;
- }
-
- String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
- if (StringUtils.isNotEmpty(cannedACLName)) {
- CannedAccessControlList cannedACL =
- CannedAccessControlList.valueOf(cannedACLName);
- ossClient.setBucketAcl(bucketName, cannedACL);
- }
-
- maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
- bucketName = uri.getHost();
- }
-
- /**
- * Delete an object, and update write operation statistics.
- *
- * @param key key to blob to delete.
- */
- public void deleteObject(String key) {
- ossClient.deleteObject(bucketName, key);
- statistics.incrementWriteOps(1);
- }
-
- /**
- * Delete a list of keys, and update write operation statistics.
- *
- * @param keysToDelete collection of keys to delete.
- */
- public void deleteObjects(List<String> keysToDelete) {
- if (CollectionUtils.isNotEmpty(keysToDelete)) {
- DeleteObjectsRequest deleteRequest =
- new DeleteObjectsRequest(bucketName);
- deleteRequest.setKeys(keysToDelete);
- ossClient.deleteObjects(deleteRequest);
- statistics.incrementWriteOps(keysToDelete.size());
- }
- }
-
- /**
- * Delete a directory from Aliyun OSS.
- *
- * @param key directory key to delete.
- */
- public void deleteDirs(String key) {
- key = AliyunOSSUtils.maybeAddTrailingSlash(key);
- ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
- listRequest.setPrefix(key);
- listRequest.setDelimiter(null);
- listRequest.setMaxKeys(maxKeys);
-
- while (true) {
- ObjectListing objects = ossClient.listObjects(listRequest);
- statistics.incrementReadOps(1);
- List<String> keysToDelete = new ArrayList<String>();
- for (OSSObjectSummary objectSummary : objects.getObjectSummaries()) {
- keysToDelete.add(objectSummary.getKey());
- }
- deleteObjects(keysToDelete);
- if (objects.isTruncated()) {
- listRequest.setMarker(objects.getNextMarker());
- } else {
- break;
- }
- }
- }
-
- /**
- * Return metadata of a given object key.
- *
- * @param key object key.
- * @return return null if key does not exist.
- */
- public ObjectMetadata getObjectMetadata(String key) {
- try {
- return ossClient.getObjectMetadata(bucketName, key);
- } catch (OSSException osse) {
- return null;
- } finally {
- statistics.incrementReadOps(1);
- }
- }
-
- /**
- * Upload an empty file as an OSS object, using single upload.
- *
- * @param key object key.
- * @throws IOException if failed to upload object.
- */
- public void storeEmptyFile(String key) throws IOException {
- ObjectMetadata dirMeta = new ObjectMetadata();
- byte[] buffer = new byte[0];
- ByteArrayInputStream in = new ByteArrayInputStream(buffer);
- dirMeta.setContentLength(0);
- try {
- ossClient.putObject(bucketName, key, in, dirMeta);
- } finally {
- in.close();
- }
- }
-
- /**
- * Copy an object from source key to destination key.
- *
- * @param srcKey source key.
- * @param dstKey destination key.
- * @return true if file is successfully copied.
- */
- public boolean copyFile(String srcKey, String dstKey) {
- ObjectMetadata objectMeta =
- ossClient.getObjectMetadata(bucketName, srcKey);
- long contentLength = objectMeta.getContentLength();
- if (contentLength <= multipartThreshold) {
- return singleCopy(srcKey, dstKey);
- } else {
- return multipartCopy(srcKey, contentLength, dstKey);
- }
- }
-
- /**
- * Use single copy to copy an OSS object.
- * (The caller should make sure srcPath is a file and dstPath is valid)
- *
- * @param srcKey source key.
- * @param dstKey destination key.
- * @return true if object is successfully copied.
- */
- private boolean singleCopy(String srcKey, String dstKey) {
- CopyObjectResult copyResult =
- ossClient.copyObject(bucketName, srcKey, bucketName, dstKey);
- LOG.debug(copyResult.getETag());
- return true;
- }
-
- /**
- * Use multipart copy to copy an OSS object.
- * (The caller should make sure srcPath is a file and dstPath is valid)
- *
- * @param srcKey source key.
- * @param contentLength data size of the object to copy.
- * @param dstKey destination key.
- * @return true if success, or false if upload is aborted.
- */
- private boolean multipartCopy(String srcKey, long contentLength,
- String dstKey) {
- long realPartSize =
- AliyunOSSUtils.calculatePartSize(contentLength, uploadPartSize);
- int partNum = (int) (contentLength / realPartSize);
- if (contentLength % realPartSize != 0) {
- partNum++;
- }
- InitiateMultipartUploadRequest initiateMultipartUploadRequest =
- new InitiateMultipartUploadRequest(bucketName, dstKey);
- ObjectMetadata meta = new ObjectMetadata();
- if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
- meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
- }
- initiateMultipartUploadRequest.setObjectMetadata(meta);
- InitiateMultipartUploadResult initiateMultipartUploadResult =
- ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
- String uploadId = initiateMultipartUploadResult.getUploadId();
- List<PartETag> partETags = new ArrayList<PartETag>();
- try {
- for (int i = 0; i < partNum; i++) {
- long skipBytes = realPartSize * i;
- long size = (realPartSize < contentLength - skipBytes) ?
- realPartSize : contentLength - skipBytes;
- UploadPartCopyRequest partCopyRequest = new UploadPartCopyRequest();
- partCopyRequest.setSourceBucketName(bucketName);
- partCopyRequest.setSourceKey(srcKey);
- partCopyRequest.setBucketName(bucketName);
- partCopyRequest.setKey(dstKey);
- partCopyRequest.setUploadId(uploadId);
- partCopyRequest.setPartSize(size);
- partCopyRequest.setBeginIndex(skipBytes);
- partCopyRequest.setPartNumber(i + 1);
- UploadPartCopyResult partCopyResult =
- ossClient.uploadPartCopy(partCopyRequest);
- statistics.incrementWriteOps(1);
- partETags.add(partCopyResult.getPartETag());
- }
- CompleteMultipartUploadRequest completeMultipartUploadRequest =
- new CompleteMultipartUploadRequest(bucketName, dstKey,
- uploadId, partETags);
- CompleteMultipartUploadResult completeMultipartUploadResult =
- ossClient.completeMultipartUpload(completeMultipartUploadRequest);
- LOG.debug(completeMultipartUploadResult.getETag());
- return true;
- } catch (OSSException | ClientException e) {
- AbortMultipartUploadRequest abortMultipartUploadRequest =
- new AbortMultipartUploadRequest(bucketName, dstKey, uploadId);
- ossClient.abortMultipartUpload(abortMultipartUploadRequest);
- return false;
- }
- }
-
- /**
- * Upload a file as an OSS object, using single upload.
- *
- * @param key object key.
- * @param file local file to upload.
- * @throws IOException if failed to upload object.
- */
- public void uploadObject(String key, File file) throws IOException {
- File object = file.getAbsoluteFile();
- FileInputStream fis = new FileInputStream(object);
- ObjectMetadata meta = new ObjectMetadata();
- meta.setContentLength(object.length());
- if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
- meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
- }
- try {
- PutObjectResult result = ossClient.putObject(bucketName, key, fis, meta);
- LOG.debug(result.getETag());
- statistics.incrementWriteOps(1);
- } finally {
- fis.close();
- }
- }
-
- /**
- * Upload a file as an OSS object, using multipart upload.
- *
- * @param key object key.
- * @param file local file to upload.
- * @throws IOException if failed to upload object.
- */
- public void multipartUploadObject(String key, File file) throws IOException {
- File object = file.getAbsoluteFile();
- long dataLen = object.length();
- long realPartSize = AliyunOSSUtils.calculatePartSize(dataLen, partSize);
- int partNum = (int) (dataLen / realPartSize);
- if (dataLen % realPartSize != 0) {
- partNum += 1;
- }
-
- InitiateMultipartUploadRequest initiateMultipartUploadRequest =
- new InitiateMultipartUploadRequest(bucketName, key);
- ObjectMetadata meta = new ObjectMetadata();
- if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
- meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
- }
- initiateMultipartUploadRequest.setObjectMetadata(meta);
- InitiateMultipartUploadResult initiateMultipartUploadResult =
- ossClient.initiateMultipartUpload(initiateMultipartUploadRequest);
- List<PartETag> partETags = new ArrayList<PartETag>();
- String uploadId = initiateMultipartUploadResult.getUploadId();
-
- try {
- for (int i = 0; i < partNum; i++) {
- // TODO: Optimize this, avoid opening the object multiple times
- FileInputStream fis = new FileInputStream(object);
- try {
- long skipBytes = realPartSize * i;
- AliyunOSSUtils.skipFully(fis, skipBytes);
- long size = (realPartSize < dataLen - skipBytes) ?
- realPartSize : dataLen - skipBytes;
- UploadPartRequest uploadPartRequest = new UploadPartRequest();
- uploadPartRequest.setBucketName(bucketName);
- uploadPartRequest.setKey(key);
- uploadPartRequest.setUploadId(uploadId);
- uploadPartRequest.setInputStream(fis);
- uploadPartRequest.setPartSize(size);
- uploadPartRequest.setPartNumber(i + 1);
- UploadPartResult uploadPartResult =
- ossClient.uploadPart(uploadPartRequest);
- statistics.incrementWriteOps(1);
- partETags.add(uploadPartResult.getPartETag());
- } finally {
- fis.close();
- }
- }
- CompleteMultipartUploadRequest completeMultipartUploadRequest =
- new CompleteMultipartUploadRequest(bucketName, key,
- uploadId, partETags);
- CompleteMultipartUploadResult completeMultipartUploadResult =
- ossClient.completeMultipartUpload(completeMultipartUploadRequest);
- LOG.debug(completeMultipartUploadResult.getETag());
- } catch (OSSException | ClientException e) {
- AbortMultipartUploadRequest abortMultipartUploadRequest =
- new AbortMultipartUploadRequest(bucketName, key, uploadId);
- ossClient.abortMultipartUpload(abortMultipartUploadRequest);
- }
- }
-
- /**
- * list objects.
- *
- * @param prefix prefix.
- * @param maxListingLength max no. of entries
- * @param marker last key in any previous search.
- * @param recursive whether to list directory recursively.
- * @return a list of matches.
- */
- public ObjectListing listObjects(String prefix, int maxListingLength,
- String marker, boolean recursive) {
- String delimiter = recursive ? null : "/";
- prefix = AliyunOSSUtils.maybeAddTrailingSlash(prefix);
- ListObjectsRequest listRequest = new ListObjectsRequest(bucketName);
- listRequest.setPrefix(prefix);
- listRequest.setDelimiter(delimiter);
- listRequest.setMaxKeys(maxListingLength);
- listRequest.setMarker(marker);
-
- ObjectListing listing = ossClient.listObjects(listRequest);
- statistics.incrementReadOps(1);
- return listing;
- }
-
- /**
- * Retrieve a part of an object.
- *
- * @param key the object name that is being retrieved from the Aliyun OSS.
- * @param byteStart start position.
- * @param byteEnd end position.
- * @return This method returns null if the key is not found.
- */
- public InputStream retrieve(String key, long byteStart, long byteEnd) {
- try {
- GetObjectRequest request = new GetObjectRequest(bucketName, key);
- request.setRange(byteStart, byteEnd);
- return ossClient.getObject(request).getObjectContent();
- } catch (OSSException | ClientException e) {
- return null;
- }
- }
-
- /**
- * Close OSS client properly.
- */
- public void close() {
- if (ossClient != null) {
- ossClient.shutdown();
- ossClient = null;
- }
- }
-
- /**
- * Clean up all objects matching the prefix.
- *
- * @param prefix Aliyun OSS object prefix.
- */
- public void purge(String prefix) {
- String key;
- try {
- ObjectListing objects = listObjects(prefix, maxKeys, null, true);
- for (OSSObjectSummary object : objects.getObjectSummaries()) {
- key = object.getKey();
- ossClient.deleteObject(bucketName, key);
- }
-
- for (String dir: objects.getCommonPrefixes()) {
- deleteDirs(dir);
- }
- } catch (OSSException | ClientException e) {
- LOG.error("Failed to purge " + prefix);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
deleted file mode 100644
index b87a3a7..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSInputStream.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSExceptionMessages;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * The input stream for OSS blob system.
- * The class uses multi-part downloading to read data from the object content
- * stream.
- */
-public class AliyunOSSInputStream extends FSInputStream {
- public static final Log LOG = LogFactory.getLog(AliyunOSSInputStream.class);
- private final long downloadPartSize;
- private AliyunOSSFileSystemStore store;
- private final String key;
- private Statistics statistics;
- private boolean closed;
- private InputStream wrappedStream = null;
- private long contentLength;
- private long position;
- private long partRemaining;
-
- public AliyunOSSInputStream(Configuration conf,
- AliyunOSSFileSystemStore store, String key, Long contentLength,
- Statistics statistics) throws IOException {
- this.store = store;
- this.key = key;
- this.statistics = statistics;
- this.contentLength = contentLength;
- downloadPartSize = conf.getLong(MULTIPART_DOWNLOAD_SIZE_KEY,
- MULTIPART_DOWNLOAD_SIZE_DEFAULT);
- reopen(0);
- closed = false;
- }
-
- /**
- * Reopen the wrapped stream at give position, by seeking for
- * data of a part length from object content stream.
- *
- * @param pos position from start of a file
- * @throws IOException if failed to reopen
- */
- private synchronized void reopen(long pos) throws IOException {
- long partSize;
-
- if (pos < 0) {
- throw new EOFException("Cannot seek at negative position:" + pos);
- } else if (pos > contentLength) {
- throw new EOFException("Cannot seek after EOF, contentLength:" +
- contentLength + " position:" + pos);
- } else if (pos + downloadPartSize > contentLength) {
- partSize = contentLength - pos;
- } else {
- partSize = downloadPartSize;
- }
-
- if (wrappedStream != null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Aborting old stream to open at pos " + pos);
- }
- wrappedStream.close();
- }
-
- wrappedStream = store.retrieve(key, pos, pos + partSize -1);
- if (wrappedStream == null) {
- throw new IOException("Null IO stream");
- }
- position = pos;
- partRemaining = partSize;
- }
-
- @Override
- public synchronized int read() throws IOException {
- checkNotClosed();
-
- if (partRemaining <= 0 && position < contentLength) {
- reopen(position);
- }
-
- int tries = MAX_RETRIES;
- boolean retry;
- int byteRead = -1;
- do {
- retry = false;
- try {
- byteRead = wrappedStream.read();
- } catch (Exception e) {
- handleReadException(e, --tries);
- retry = true;
- }
- } while (retry);
- if (byteRead >= 0) {
- position++;
- partRemaining--;
- }
-
- if (statistics != null && byteRead >= 0) {
- statistics.incrementBytesRead(1);
- }
- return byteRead;
- }
-
-
- /**
- * Verify that the input stream is open. Non blocking; this gives
- * the last state of the volatile {@link #closed} field.
- *
- * @throws IOException if the connection is closed.
- */
- private void checkNotClosed() throws IOException {
- if (closed) {
- throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
- }
- }
-
- @Override
- public synchronized int read(byte[] buf, int off, int len)
- throws IOException {
- checkNotClosed();
-
- if (buf == null) {
- throw new NullPointerException();
- } else if (off < 0 || len < 0 || len > buf.length - off) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- int bytesRead = 0;
- // Not EOF, and read not done
- while (position < contentLength && bytesRead < len) {
- if (partRemaining == 0) {
- reopen(position);
- }
-
- int tries = MAX_RETRIES;
- boolean retry;
- int bytes = -1;
- do {
- retry = false;
- try {
- bytes = wrappedStream.read(buf, off + bytesRead, len - bytesRead);
- } catch (Exception e) {
- handleReadException(e, --tries);
- retry = true;
- }
- } while (retry);
-
- if (bytes > 0) {
- bytesRead += bytes;
- position += bytes;
- partRemaining -= bytes;
- } else if (partRemaining != 0) {
- throw new IOException("Failed to read from stream. Remaining:" +
- partRemaining);
- }
- }
-
- if (statistics != null && bytesRead > 0) {
- statistics.incrementBytesRead(bytesRead);
- }
-
- // Read nothing, but attempt to read something
- if (bytesRead == 0 && len > 0) {
- return -1;
- } else {
- return bytesRead;
- }
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
- if (wrappedStream != null) {
- wrappedStream.close();
- }
- }
-
- @Override
- public synchronized int available() throws IOException {
- checkNotClosed();
-
- long remaining = contentLength - position;
- if (remaining > Integer.MAX_VALUE) {
- return Integer.MAX_VALUE;
- }
- return (int)remaining;
- }
-
- @Override
- public synchronized void seek(long pos) throws IOException {
- checkNotClosed();
- if (position == pos) {
- return;
- } else if (pos > position && pos < position + partRemaining) {
- AliyunOSSUtils.skipFully(wrappedStream, pos - position);
- position = pos;
- } else {
- reopen(pos);
- }
- }
-
- @Override
- public synchronized long getPos() throws IOException {
- checkNotClosed();
- return position;
- }
-
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- checkNotClosed();
- return false;
- }
-
- private void handleReadException(Exception e, int tries) throws IOException{
- if (tries == 0) {
- throw new IOException(e);
- }
-
- LOG.warn("Some exceptions occurred in oss connection, try to reopen oss" +
- " connection at position '" + position + "', " + e.getMessage());
- try {
- Thread.sleep(100);
- } catch (InterruptedException e2) {
- LOG.warn(e2.getMessage());
- }
- reopen(position);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
deleted file mode 100644
index c75ee18..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSOutputStream.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.util.Progressable;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * The output stream for OSS blob system.
- * Data will be buffered on local disk, then uploaded to OSS in
- * {@link #close()} method.
- */
-public class AliyunOSSOutputStream extends OutputStream {
- public static final Log LOG = LogFactory.getLog(AliyunOSSOutputStream.class);
- private AliyunOSSFileSystemStore store;
- private final String key;
- private Statistics statistics;
- private Progressable progress;
- private long partSizeThreshold;
- private LocalDirAllocator dirAlloc;
- private boolean closed;
- private File tmpFile;
- private BufferedOutputStream backupStream;
-
- public AliyunOSSOutputStream(Configuration conf,
- AliyunOSSFileSystemStore store, String key, Progressable progress,
- Statistics statistics) throws IOException {
- this.store = store;
- this.key = key;
- // The caller cann't get any progress information
- this.progress = progress;
- this.statistics = statistics;
- partSizeThreshold = conf.getLong(MIN_MULTIPART_UPLOAD_THRESHOLD_KEY,
- MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT);
-
- if (conf.get(BUFFER_DIR_KEY) == null) {
- conf.set(BUFFER_DIR_KEY, conf.get("hadoop.tmp.dir") + "/oss");
- }
- dirAlloc = new LocalDirAllocator(BUFFER_DIR_KEY);
-
- tmpFile = dirAlloc.createTmpFileForWrite("output-",
- LocalDirAllocator.SIZE_UNKNOWN, conf);
- backupStream = new BufferedOutputStream(new FileOutputStream(tmpFile));
- closed = false;
- }
-
- @Override
- public synchronized void close() throws IOException {
- if (closed) {
- return;
- }
- closed = true;
- if (backupStream != null) {
- backupStream.close();
- }
- long dataLen = tmpFile.length();
- try {
- if (dataLen <= partSizeThreshold) {
- store.uploadObject(key, tmpFile);
- } else {
- store.multipartUploadObject(key, tmpFile);
- }
- } finally {
- if (!tmpFile.delete()) {
- LOG.warn("Can not delete file: " + tmpFile);
- }
- }
- }
-
-
-
- @Override
- public synchronized void flush() throws IOException {
- backupStream.flush();
- }
-
- @Override
- public synchronized void write(int b) throws IOException {
- backupStream.write(b);
- statistics.incrementBytesWritten(1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
deleted file mode 100644
index cae9749..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import com.aliyun.oss.common.auth.CredentialsProvider;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.ProviderUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.fs.aliyun.oss.Constants.*;
-
-/**
- * Utility methods for Aliyun OSS code.
- */
-final public class AliyunOSSUtils {
- private static final Logger LOG =
- LoggerFactory.getLogger(AliyunOSSUtils.class);
-
- private AliyunOSSUtils() {
- }
-
- /**
- * Used to get password from configuration.
- *
- * @param conf configuration that contains password information
- * @param key the key of the password
- * @return the value for the key
- * @throws IOException if failed to get password from configuration
- */
- public static String getValueWithKey(Configuration conf, String key)
- throws IOException {
- try {
- final char[] pass = conf.getPassword(key);
- if (pass != null) {
- return (new String(pass)).trim();
- } else {
- return "";
- }
- } catch (IOException ioe) {
- throw new IOException("Cannot find password option " + key, ioe);
- }
- }
-
- /**
- * Skip the requested number of bytes or fail if there are no enough bytes
- * left. This allows for the possibility that {@link InputStream#skip(long)}
- * may not skip as many bytes as requested (most likely because of reaching
- * EOF).
- *
- * @param is the input stream to skip.
- * @param n the number of bytes to skip.
- * @throws IOException thrown when skipped less number of bytes.
- */
- public static void skipFully(InputStream is, long n) throws IOException {
- long total = 0;
- long cur = 0;
-
- do {
- cur = is.skip(n - total);
- total += cur;
- } while((total < n) && (cur > 0));
-
- if (total < n) {
- throw new IOException("Failed to skip " + n + " bytes, possibly due " +
- "to EOF.");
- }
- }
-
- /**
- * Calculate a proper size of multipart piece. If <code>minPartSize</code>
- * is too small, the number of multipart pieces may exceed the limit of
- * {@link Constants#MULTIPART_UPLOAD_PART_NUM_LIMIT}.
- *
- * @param contentLength the size of file.
- * @param minPartSize the minimum size of multipart piece.
- * @return a revisional size of multipart piece.
- */
- public static long calculatePartSize(long contentLength, long minPartSize) {
- long tmpPartSize = contentLength / MULTIPART_UPLOAD_PART_NUM_LIMIT + 1;
- return Math.max(minPartSize, tmpPartSize);
- }
-
- /**
- * Create credential provider specified by configuration, or create default
- * credential provider if not specified.
- *
- * @param conf configuration
- * @return a credential provider
- * @throws IOException on any problem. Class construction issues may be
- * nested inside the IOE.
- */
- public static CredentialsProvider getCredentialsProvider(Configuration conf)
- throws IOException {
- CredentialsProvider credentials;
-
- String className = conf.getTrimmed(ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY);
- if (StringUtils.isEmpty(className)) {
- Configuration newConf =
- ProviderUtils.excludeIncompatibleCredentialProviders(conf,
- AliyunOSSFileSystem.class);
- credentials = new AliyunCredentialsProvider(newConf);
- } else {
- try {
- LOG.debug("Credential provider class is:" + className);
- Class<?> credClass = Class.forName(className);
- try {
- credentials =
- (CredentialsProvider)credClass.getDeclaredConstructor(
- Configuration.class).newInstance(conf);
- } catch (NoSuchMethodException | SecurityException e) {
- credentials =
- (CredentialsProvider)credClass.getDeclaredConstructor()
- .newInstance();
- }
- } catch (ClassNotFoundException e) {
- throw new IOException(className + " not found.", e);
- } catch (NoSuchMethodException | SecurityException e) {
- throw new IOException(String.format("%s constructor exception. A " +
- "class specified in %s must provide an accessible constructor " +
- "accepting URI and Configuration, or an accessible default " +
- "constructor.", className, ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY),
- e);
- } catch (ReflectiveOperationException | IllegalArgumentException e) {
- throw new IOException(className + " instantiation exception.", e);
- }
- }
-
- return credentials;
- }
-
- /**
- * Turns a path (relative or otherwise) into an OSS key, adding a trailing
- * "/" if the path is not the root <i>and</i> does not already have a "/"
- * at the end.
- *
- * @param key OSS key or ""
- * @return the with a trailing "/", or, if it is the root key, "".
- */
- public static String maybeAddTrailingSlash(String key) {
- if (StringUtils.isNotEmpty(key) && !key.endsWith("/")) {
- return key + '/';
- } else {
- return key;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
deleted file mode 100644
index 04a2ccd..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.aliyun.oss;
-
-/**
- * ALL configuration constants for OSS filesystem.
- */
-public final class Constants {
-
- private Constants() {
- }
-
- // Class of credential provider
- public static final String ALIYUN_OSS_CREDENTIALS_PROVIDER_KEY =
- "fs.oss.credentials.provider";
-
- // OSS access verification
- public static final String ACCESS_KEY_ID = "fs.oss.accessKeyId";
- public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
- public static final String SECURITY_TOKEN = "fs.oss.securityToken";
-
- // Number of simultaneous connections to oss
- public static final String MAXIMUM_CONNECTIONS_KEY =
- "fs.oss.connection.maximum";
- public static final int MAXIMUM_CONNECTIONS_DEFAULT = 32;
-
- // Connect to oss over ssl
- public static final String SECURE_CONNECTIONS_KEY =
- "fs.oss.connection.secure.enabled";
- public static final boolean SECURE_CONNECTIONS_DEFAULT = true;
-
- // Use a custom endpoint
- public static final String ENDPOINT_KEY = "fs.oss.endpoint";
-
- // Connect to oss through a proxy server
- public static final String PROXY_HOST_KEY = "fs.oss.proxy.host";
- public static final String PROXY_PORT_KEY = "fs.oss.proxy.port";
- public static final String PROXY_USERNAME_KEY = "fs.oss.proxy.username";
- public static final String PROXY_PASSWORD_KEY = "fs.oss.proxy.password";
- public static final String PROXY_DOMAIN_KEY = "fs.oss.proxy.domain";
- public static final String PROXY_WORKSTATION_KEY =
- "fs.oss.proxy.workstation";
-
- // Number of times we should retry errors
- public static final String MAX_ERROR_RETRIES_KEY = "fs.oss.attempts.maximum";
- public static final int MAX_ERROR_RETRIES_DEFAULT = 20;
-
- // Time until we give up trying to establish a connection to oss
- public static final String ESTABLISH_TIMEOUT_KEY =
- "fs.oss.connection.establish.timeout";
- public static final int ESTABLISH_TIMEOUT_DEFAULT = 50000;
-
- // Time until we give up on a connection to oss
- public static final String SOCKET_TIMEOUT_KEY = "fs.oss.connection.timeout";
- public static final int SOCKET_TIMEOUT_DEFAULT = 200000;
-
- // Number of records to get while paging through a directory listing
- public static final String MAX_PAGING_KEYS_KEY = "fs.oss.paging.maximum";
- public static final int MAX_PAGING_KEYS_DEFAULT = 1000;
-
- // Size of each of or multipart pieces in bytes
- public static final String MULTIPART_UPLOAD_SIZE_KEY =
- "fs.oss.multipart.upload.size";
-
- public static final long MULTIPART_UPLOAD_SIZE_DEFAULT = 10 * 1024 * 1024;
- public static final int MULTIPART_UPLOAD_PART_NUM_LIMIT = 10000;
-
- // Minimum size in bytes before we start a multipart uploads or copy
- public static final String MIN_MULTIPART_UPLOAD_THRESHOLD_KEY =
- "fs.oss.multipart.upload.threshold";
- public static final long MIN_MULTIPART_UPLOAD_THRESHOLD_DEFAULT =
- 20 * 1024 * 1024;
-
- public static final String MULTIPART_DOWNLOAD_SIZE_KEY =
- "fs.oss.multipart.download.size";
-
- public static final long MULTIPART_DOWNLOAD_SIZE_DEFAULT = 100 * 1024;
-
- // Comma separated list of directories
- public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
-
- // private | public-read | public-read-write
- public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
- public static final String CANNED_ACL_DEFAULT = "";
-
- // OSS server-side encryption
- public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM_KEY =
- "fs.oss.server-side-encryption-algorithm";
-
- public static final String FS_OSS_BLOCK_SIZE_KEY = "fs.oss.block.size";
- public static final int FS_OSS_BLOCK_SIZE_DEFAULT = 64 * 1024 * 1024;
- public static final String FS_OSS = "oss";
-
- public static final long MIN_MULTIPART_UPLOAD_PART_SIZE = 100 * 1024L;
- public static final int MAX_RETRIES = 10;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1443988/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
deleted file mode 100644
index 234567b..0000000
--- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/package-info.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Aliyun OSS Filesystem.
- */
-package org.apache.hadoop.fs.aliyun.oss;
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org