You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2016/06/09 22:17:59 UTC
[2/2] hadoop git commit: Revert "HADOOP-12666. Support Microsoft
Azure Data Lake - as a file system in Hadoop. Contributed by Vishwajeet
Dusane."
Revert "HADOOP-12666. Support Microsoft Azure Data Lake - as a file system in Hadoop. Contributed by Vishwajeet Dusane."
This reverts commit a8f03ef7ea8163c00ce5d72a4e1c77284befe5aa.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19259422
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19259422
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19259422
Branch: refs/heads/branch-2
Commit: 19259422945b94dfc7a4cc752bf171debcfbad5f
Parents: a8f03ef
Author: Chris Nauroth <cn...@apache.org>
Authored: Thu Jun 9 15:17:18 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Thu Jun 9 15:17:18 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 60 -
.../conf/TestCommonConfigurationFields.java | 6 -
hadoop-project/src/site/site.xml | 2 -
.../dev-support/findbugs-exclude.xml | 24 -
hadoop-tools/hadoop-azure-datalake/pom.xml | 180 ---
.../main/java/org/apache/hadoop/fs/adl/Adl.java | 52 -
.../org/apache/hadoop/fs/adl/AdlFileSystem.java | 41 -
...hedRefreshTokenBasedAccessTokenProvider.java | 135 ---
.../hadoop/fs/adl/oauth2/package-info.java | 23 -
.../org/apache/hadoop/fs/adl/package-info.java | 23 -
.../org/apache/hadoop/hdfs/web/ADLConfKeys.java | 61 -
.../apache/hadoop/hdfs/web/BufferManager.java | 180 ---
.../web/PrivateAzureDataLakeFileSystem.java | 1108 ------------------
...hedRefreshTokenBasedAccessTokenProvider.java | 37 -
.../hadoop/hdfs/web/oauth2/package-info.java | 24 -
.../apache/hadoop/hdfs/web/package-info.java | 25 -
.../hadoop/hdfs/web/resources/ADLFlush.java | 49 -
.../hdfs/web/resources/ADLGetOpParam.java | 96 --
.../hdfs/web/resources/ADLPostOpParam.java | 97 --
.../hdfs/web/resources/ADLPutOpParam.java | 94 --
.../hdfs/web/resources/ADLVersionInfo.java | 51 -
.../web/resources/AppendADLNoRedirectParam.java | 45 -
.../web/resources/CreateADLNoRedirectParam.java | 44 -
.../hadoop/hdfs/web/resources/LeaseParam.java | 53 -
.../web/resources/ReadADLNoRedirectParam.java | 44 -
.../hadoop/hdfs/web/resources/package-info.java | 27 -
.../src/site/markdown/index.md | 219 ----
...hedRefreshTokenBasedAccessTokenProvider.java | 147 ---
hadoop-tools/hadoop-tools-dist/pom.xml | 6 -
hadoop-tools/pom.xml | 1 -
30 files changed, 2954 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 41bf6d8..490f1de 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2213,64 +2213,4 @@
needs to be specified in net.topology.script.file.name.
</description>
</property>
-
-
- <!-- Azure Data Lake File System Configurations -->
-
- <property>
- <name>adl.feature.override.readahead</name>
- <value>true</value>
- <description>
- Enables read aheads in the ADL client, the feature is used to
- improve read throughput.
- This works in conjunction with the value set in
- adl.feature.override.readahead.max.buffersize.
- When set to false the read ahead feature is turned off.
- Default : True if not configured.
- </description>
- </property>
-
- <property>
- <name>adl.feature.override.readahead.max.buffersize</name>
- <value>8388608</value>
- <description>
- Define maximum buffer size to cache read ahead data, this is
- allocated per process to
- cache read ahead data. Applicable only when
- adl.feature.override.readahead is set to true.
- Default : 8388608 Byte i.e. 8MB if not configured.
- </description>
- </property>
-
- <property>
- <name>adl.feature.override.readahead.max.concurrent.connection</name>
- <value>2</value>
- <description>
- Define maximum concurrent connection can be established to
- read ahead. If the data size is less than 4MB then only 1 read n/w
- connection
- is set. If the data size is less than 4MB but less than 8MB then 2 read
- n/w connection
- is set. Data greater than 8MB then value set under the property would
- take
- effect. Applicable only when adl.feature.override.readahead is set
- to true and buffer size is greater than 8MB.
- It is recommended to reset this property if the
- adl.feature.override.readahead.max.buffersize
- is less than 8MB to gain performance. Application has to consider
- throttling limit for the account as well before configuring large
- buffer size.
- </description>
- </property>
-
- <property>
- <name>fs.adl.impl</name>
- <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
- </property>
-
- <property>
- <name>fs.AbstractFileSystem.adl.impl</name>
- <value>org.apache.hadoop.fs.adl.Adl</value>
- </property>
-
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 020474f..90f7514 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -102,12 +102,6 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
xmlPrefixToSkipCompare.add("s3.");
xmlPrefixToSkipCompare.add("s3native.");
- // ADL properties are in a different subtree
- // - org.apache.hadoop.hdfs.web.ADLConfKeys
- xmlPrefixToSkipCompare.add("adl.");
- xmlPropsToSkipCompare.add("fs.adl.impl");
- xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl");
-
// Deprecated properties. These should eventually be removed from the
// class.
configurationPropsToSkipCompare
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index dd9e3e9..0167f0c 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -144,8 +144,6 @@
<menu name="Hadoop Compatible File Systems" inherit="top">
<item name="Amazon S3" href="hadoop-aws/tools/hadoop-aws/index.html"/>
<item name="Azure Blob Storage" href="hadoop-azure/index.html"/>
- <item name="Azure Data Lake Storage"
- href="hadoop-azure-datalake/index.html"/>
<item name="OpenStack Swift" href="hadoop-openstack/index.html"/>
</menu>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
deleted file mode 100644
index 4fd36ef..0000000
--- a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
+++ /dev/null
@@ -1,24 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<FindBugsFilter>
- <!-- Buffer object is accessed withing trusted code and intentionally assigned instead of array copy -->
- <Match>
- <Class name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/>
- <Bug pattern="EI_EXPOSE_REP2"/>
- <Priority value="2"/>
- </Match>
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml
deleted file mode 100644
index 66c874c..0000000
--- a/hadoop-tools/hadoop-azure-datalake/pom.xml
+++ /dev/null
@@ -1,180 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-project</artifactId>
- <version>2.9.0-SNAPSHOT</version>
- <relativePath>../../hadoop-project</relativePath>
- </parent>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-azure-datalake</artifactId>
- <name>Apache Hadoop Azure Data Lake support</name>
- <description>
- This module contains code to support integration with Azure Data Lake.
- </description>
- <packaging>jar</packaging>
- <properties>
- <okHttpVersion>2.4.0</okHttpVersion>
- <minimalJsonVersion>0.9.1</minimalJsonVersion>
- <file.encoding>UTF-8</file.encoding>
- <downloadSources>true</downloadSources>
- </properties>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>findbugs-maven-plugin</artifactId>
- <configuration>
- <findbugsXmlOutput>true</findbugsXmlOutput>
- <xmlOutput>true</xmlOutput>
- <excludeFilterFile>
- ${basedir}/dev-support/findbugs-exclude.xml
- </excludeFilterFile>
- <effort>Max</effort>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-project-info-reports-plugin</artifactId>
-
- <configuration>
- <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
- <dependencyLocationsEnabled>false
- </dependencyLocationsEnabled>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>test-jar</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <executions>
- <execution>
- <id>deplist</id>
- <phase>compile</phase>
- <goals>
- <goal>list</goal>
- </goals>
- <configuration>
- <!-- build a shellprofile -->
- <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
- </configuration>
- </execution>
- </executions>
- </plugin>
- </plugins>
-
-
- <!--
- The following is to suppress a m2e warning in eclipse
- (m2e doesn't know how to handle maven-enforcer:enforce, so we have to tell m2e to ignore it)
- see: http://stackoverflow.com/questions/13040788/how-to-elimate-the-maven-enforcer-plugin-goal-enforce-is-ignored-by-m2e-wa
- -->
- <pluginManagement>
- <plugins>
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.maven.plugins
- </groupId>
- <artifactId>maven-enforcer-plugin
- </artifactId>
- <versionRange>[1.0.0,)</versionRange>
- <goals>
- <goal>enforce</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore/>
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
-
- </build>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <exclusions>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.eclipsesource.minimal-json</groupId>
- <artifactId>minimal-json</artifactId>
- <version>0.9.1</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>com.squareup.okhttp</groupId>
- <artifactId>mockwebserver</artifactId>
- <version>2.4.0</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
deleted file mode 100644
index 4642d6b..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.fs.adl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DelegateToFileSystem;
-
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-
-/**
- * Expose adl:// scheme to access ADL file system.
- */
-public class Adl extends DelegateToFileSystem {
-
- Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException {
- super(theUri, createDataLakeFileSystem(conf), conf, AdlFileSystem.SCHEME,
- false);
- }
-
- private static AdlFileSystem createDataLakeFileSystem(Configuration conf) {
- AdlFileSystem fs = new AdlFileSystem();
- fs.setConf(conf);
- return fs;
- }
-
- /**
- * @return Default port for ADL File system to communicate
- */
- @Override
- public final int getUriDefaultPort() {
- return AdlFileSystem.DEFAULT_PORT;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
deleted file mode 100644
index 11e1e0b..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.fs.adl;
-
-import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem;
-
-/**
- * Expose adl:// scheme to access ADL file system.
- */
-public class AdlFileSystem extends PrivateAzureDataLakeFileSystem {
-
- public static final String SCHEME = "adl";
- public static final int DEFAULT_PORT = 443;
-
- @Override
- public String getScheme() {
- return SCHEME;
- }
-
- @Override
- public int getDefaultPort() {
- return DEFAULT_PORT;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
deleted file mode 100644
index b7f3b00..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.fs.adl.oauth2;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.LinkedHashMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
-import org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider;
-import org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
-import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
-import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
-
-/**
- * Share refresh tokens across all ADLS instances with a common client ID. The
- * {@link AccessTokenProvider} can be shared across multiple instances,
- * amortizing the cost of refreshing tokens.
- */
-public class CachedRefreshTokenBasedAccessTokenProvider
- extends PrivateCachedRefreshTokenBasedAccessTokenProvider {
-
- public static final String FORCE_REFRESH = "adl.force.token.refresh";
-
- private static final Logger LOG =
- LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class);
-
- /** Limit size of provider cache. */
- static final int MAX_PROVIDERS = 10;
- @SuppressWarnings("serial")
- private static final Map<String, AccessTokenProvider> CACHE =
- new LinkedHashMap<String, AccessTokenProvider>() {
- @Override
- public boolean removeEldestEntry(
- Map.Entry<String, AccessTokenProvider> e) {
- return size() > MAX_PROVIDERS;
- }
- };
-
- private AccessTokenProvider instance = null;
-
- /**
- * Create handle for cached instance.
- */
- public CachedRefreshTokenBasedAccessTokenProvider() {
- }
-
- /**
- * Gets the access token from internally cached
- * ConfRefreshTokenBasedAccessTokenProvider instance.
- *
- * @return Valid OAuth2 access token for the user.
- * @throws IOException when system error, internal server error or user error
- */
- @Override
- public synchronized String getAccessToken() throws IOException {
- return instance.getAccessToken();
- }
-
- /**
- * @return A cached Configuration consistent with the parameters of this
- * instance.
- */
- @Override
- public synchronized Configuration getConf() {
- return instance.getConf();
- }
-
- /**
- * Configure cached instance. Note that the Configuration instance returned
- * from subsequent calls to {@link #getConf() getConf} may be from a
- * previous, cached entry.
- * @param conf Configuration instance
- */
- @Override
- public synchronized void setConf(Configuration conf) {
- String id = conf.get(OAUTH_CLIENT_ID_KEY);
- if (null == id) {
- throw new IllegalArgumentException("Missing client ID");
- }
- synchronized (CACHE) {
- instance = CACHE.get(id);
- if (null == instance
- || conf.getBoolean(FORCE_REFRESH, false)
- || replace(instance, conf)) {
- instance = newInstance();
- // clone configuration
- instance.setConf(new Configuration(conf));
- CACHE.put(id, instance);
- LOG.debug("Created new client {}", id);
- }
- }
- }
-
- AccessTokenProvider newInstance() {
- return new ConfRefreshTokenBasedAccessTokenProvider();
- }
-
- private static boolean replace(AccessTokenProvider cached, Configuration c2) {
- // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null
- final Configuration c1 = cached.getConf();
- for (String key : new String[] {
- OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) {
- if (!c1.get(key).equals(c2.get(key))) {
- // replace cached instance for this clientID
- return true;
- }
- }
- return false;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
deleted file mode 100644
index b444984..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/**
- * public interface to expose OAuth2 authentication related features.
- */
-package org.apache.hadoop.fs.adl.oauth2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
deleted file mode 100644
index 98e6a77..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/**
- * Supporting classes for metrics instrumentation.
- */
-package org.apache.hadoop.fs.adl;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
deleted file mode 100644
index a7f932f..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web;
-
-/**
- * Constants.
- */
-public final class ADLConfKeys {
- public static final String
- ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN =
- "adl.feature.override.readahead.max.concurrent.connection";
- public static final int
- ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2;
- public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet";
- static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
- "adl.debug.override.localuserasfileowner";
- static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
- static final String ADL_FEATURE_REDIRECT_OFF =
- "adl.feature.override.redirection.off";
- static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true;
- static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED =
- "adl.feature.override.getblocklocation.locally.bundled";
- static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT
- = true;
- static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD =
- "adl.feature.override.readahead";
- static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT =
- true;
- static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE =
- "adl.feature.override.readahead.max.buffersize";
-
- static final int KB = 1024;
- static final int MB = KB * KB;
- static final int DEFAULT_BLOCK_SIZE = 4 * MB;
- static final int DEFAULT_EXTENT_SIZE = 256 * MB;
- static final int DEFAULT_TIMEOUT_IN_SECONDS = 120;
- static final int
- ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT =
- 8 * MB;
-
- private ADLConfKeys() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
deleted file mode 100644
index 350c6e7..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web;
-
-/**
- * Responsible for holding buffered data in the process. Hold only 1 and only
- * 1 buffer block in the memory. Buffer block
- * information is for the given file and the offset from the which the block
- * is fetched. Across the webhdfs instances if
- * same buffer block has been used then backend trip is avoided. Buffer block
- * is certainly important since ADL fetches
- * large amount of data (Default is 4MB however can be configured through
- * core-site.xml) from the backend.
- * Observation is in case of ORC/Avro kind of compressed file, buffer block
- * does not avoid few backend calls across
- * webhdfs
- * instances.
- */
-final class BufferManager {
- private static final BufferManager BUFFER_MANAGER_INSTANCE = new
- BufferManager();
- private static Object lock = new Object();
- private Buffer buffer = null;
- private String fileName;
-
- /**
- * Constructor.
- */
- private BufferManager() {
- }
-
- public static Object getLock() {
- return lock;
- }
-
- public static BufferManager getInstance() {
- return BUFFER_MANAGER_INSTANCE;
- }
-
- /**
- * Validate if the current buffer block is of given stream.
- *
- * @param path ADL stream path
- * @param offset Stream offset that caller is interested in
- * @return True if the buffer block is available otherwise false
- */
- boolean hasValidDataForOffset(String path, long offset) {
- if (this.fileName == null) {
- return false;
- }
-
- if (!this.fileName.equals(path)) {
- return false;
- }
-
- if (buffer == null) {
- return false;
- }
-
- if ((offset < buffer.offset) || (offset >= (buffer.offset
- + buffer.data.length))) {
- return false;
- }
-
- return true;
- }
-
- /**
- * Clean buffer block.
- */
- void clear() {
- buffer = null;
- }
-
- /**
- * Validate if the current buffer block is of given stream. For now partial
- * data available is not supported.
- * Data must be available exactly or within the range of offset and size
- * passed as parameter.
- *
- * @param path Stream path
- * @param offset Offset of the stream
- * @param size Size of the data from the offset of the stream caller
- * interested in
- * @return True if the data is available from the given offset and of the
- * size caller is interested in.
- */
- boolean hasData(String path, long offset, int size) {
-
- if (!hasValidDataForOffset(path, offset)) {
- return false;
- }
-
- if ((size + offset) > (buffer.data.length + buffer.offset)) {
- return false;
- }
- return true;
- }
-
- /**
- * Return the buffer block from the requested offset. It is caller
- * responsibility to check if the buffer block is
- * of there interest and offset is valid.
- *
- * @param data Byte array to be filed from the buffer block
- * @param offset Data to be fetched from the offset.
- */
- void get(byte[] data, long offset) {
- System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0,
- data.length);
- }
-
- /**
- * Create new empty buffer block of the given size.
- *
- * @param len Size of the buffer block.
- * @return Empty byte array.
- */
- byte[] getEmpty(int len) {
- return new byte[len];
- }
-
- /**
- * This function allows caller to specify new buffer block for the stream
- * which is pulled from the backend.
- *
- * @param data Buffer
- * @param path Stream path to which buffer belongs to
- * @param offset Stream offset where buffer start with
- */
- void add(byte[] data, String path, long offset) {
- if (data == null) {
- return;
- }
-
- buffer = new Buffer();
- buffer.data = data;
- buffer.offset = offset;
- this.fileName = path;
- }
-
- /**
- * @return Size of the buffer.
- */
- int getBufferSize() {
- return buffer.data.length;
- }
-
- /**
- * @return Stream offset where buffer start with
- */
- long getBufferOffset() {
- return buffer.offset;
- }
-
- /**
- * Buffer container.
- */
- static class Buffer {
- private byte[] data;
- private long offset;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
deleted file mode 100644
index 89011d2..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
+++ /dev/null
@@ -1,1108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.web.resources.ADLFlush;
-import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam;
-import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam;
-import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam;
-import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo;
-import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam;
-import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
-import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
-import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam;
-import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
-import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
-import org.apache.hadoop.hdfs.web.resources.LeaseParam;
-import org.apache.hadoop.hdfs.web.resources.LengthParam;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
-import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
-import org.apache.hadoop.hdfs.web.resources.Param;
-import org.apache.hadoop.hdfs.web.resources.PermissionParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
-import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam;
-import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.VersionInfo;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.SocketException;
-import java.net.URI;
-import java.net.URL;
-import java.util.EnumSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
- * specific stability, Reliability and performance improvement.
- * <p>
- * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent
- * implementation on org.apache.hadoop.hdfs.web package to configure query
- * parameters, configuration over HTTP request send to backend .. etc. This
- * class should be refactored and moved under package org.apache.hadoop.fs
- * .adl once the required dependent changes are made into ASF code.
- */
-public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
-
- public static final String SCHEME = "adl";
-
- // Feature configuration
- private boolean featureGetBlockLocationLocallyBundled = true;
- private boolean featureConcurrentReadWithReadAhead = true;
- private boolean featureRedirectOff = true;
- private boolean featureFlushWhenEOF = true;
- private boolean overrideOwner = false;
- private int maxConcurrentConnection;
- private int maxBufferSize;
- private String userName;
-
- /**
- * Constructor.
- */
- public PrivateAzureDataLakeFileSystem() {
- try {
- userName = UserGroupInformation.getCurrentUser().getShortUserName();
- } catch (IOException e) {
- userName = "hadoop";
- }
- }
-
- @Override
- public synchronized void initialize(URI uri, Configuration conf)
- throws IOException {
- super.initialize(uri, conf);
- overrideOwner = getConf()
- .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
- ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
-
- featureRedirectOff = getConf()
- .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF,
- ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT);
-
- featureGetBlockLocationLocallyBundled = getConf()
- .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED,
- ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT);
-
- featureConcurrentReadWithReadAhead = getConf().
- getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD,
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT);
-
- maxBufferSize = getConf().getInt(
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE,
- ADLConfKeys
- .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT);
-
- maxConcurrentConnection = getConf().getInt(
- ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN,
- ADLConfKeys
- .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT);
- }
-
- @VisibleForTesting
- protected boolean isFeatureGetBlockLocationLocallyBundled() {
- return featureGetBlockLocationLocallyBundled;
- }
-
- @VisibleForTesting
- protected boolean isFeatureConcurrentReadWithReadAhead() {
- return featureConcurrentReadWithReadAhead;
- }
-
- @VisibleForTesting
- protected boolean isFeatureRedirectOff() {
- return featureRedirectOff;
- }
-
- @VisibleForTesting
- protected boolean isOverrideOwnerFeatureOn() {
- return overrideOwner;
- }
-
- @VisibleForTesting
- protected int getMaxBufferSize() {
- return maxBufferSize;
- }
-
- @VisibleForTesting
- protected int getMaxConcurrentConnection() {
- return maxConcurrentConnection;
- }
-
- @Override
- public String getScheme() {
- return SCHEME;
- }
-
- /**
- * Constructing home directory locally is fine as long as Hadoop
- * local user name and ADL user name relationship story is not fully baked
- * yet.
- *
- * @return Hadoop local user home directory.
- */
- @Override
- public final Path getHomeDirectory() {
- try {
- return makeQualified(new Path(
- "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
- } catch (IOException e) {
- }
-
- return new Path("/user/" + userName);
- }
-
- /**
- * Azure data lake does not support user configuration for data replication
- * hence not leaving system to query on
- * azure data lake.
- *
- * Stub implementation
- *
- * @param p Not honoured
- * @param replication Not honoured
- * @return True hard coded since ADL file system does not support
- * replication configuration
- * @throws IOException No exception would not thrown in this case however
- * aligning with parent api definition.
- */
- @Override
- public final boolean setReplication(final Path p, final short replication)
- throws IOException {
- return true;
- }
-
- /**
- * @param f File/Folder path
- * @return FileStatus instance containing metadata information of f
- * @throws IOException For any system error
- */
- @Override
- public FileStatus getFileStatus(Path f) throws IOException {
- statistics.incrementReadOps(1);
- FileStatus status = super.getFileStatus(f);
-
- if (overrideOwner) {
- FileStatus proxiedStatus = new FileStatus(status.getLen(),
- status.isDirectory(), status.getReplication(), status.getBlockSize(),
- status.getModificationTime(), status.getAccessTime(),
- status.getPermission(), userName, "hdfs", status.getPath());
- return proxiedStatus;
- } else {
- return status;
- }
- }
-
- /**
- * Create call semantic is handled differently in case of ADL. Create
- * semantics is translated to Create/Append
- * semantics.
- * 1. No dedicated connection to server.
- * 2. Buffering is locally done, Once buffer is full or flush is invoked on
- * the by the caller. All the pending
- * data is pushed to ADL as APPEND operation code.
- * 3. On close - Additional call is send to server to close the stream, and
- * release lock from the stream.
- *
- * Necessity of Create/Append semantics is
- * 1. ADL backend server does not allow idle connection for longer duration
- * . In case of slow writer scenario,
- * observed connection timeout/Connection reset causing occasional job
- * failures.
- * 2. Performance boost to jobs which are slow writer, avoided network latency
- * 3. ADL equally better performing with multiple of 4MB chunk as append
- * calls.
- *
- * @param f File path
- * @param permission Access permission for the newly created file
- * @param overwrite Remove existing file and recreate new one if true
- * otherwise throw error if file exist
- * @param bufferSize Buffer size, ADL backend does not honour
- * @param replication Replication count, ADL backend does not honour
- * @param blockSize Block size, ADL backend does not honour
- * @param progress Progress indicator
- * @return FSDataOutputStream OutputStream on which application can push
- * stream of bytes
- * @throws IOException when system error, internal server error or user error
- */
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission permission,
- final boolean overwrite, final int bufferSize, final short replication,
- final long blockSize, final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
-
- return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
- new PermissionParam(applyUMask(permission)),
- new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
- new ReplicationParam(replication), new BlockSizeParam(blockSize),
- new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
- };
- }
-
- @Override
- public FSDataOutputStream createNonRecursive(final Path f,
- final FsPermission permission, final EnumSet<CreateFlag> flag,
- final int bufferSize, final short replication, final long blockSize,
- final Progressable progress) throws IOException {
- statistics.incrementWriteOps(1);
-
- String leaseId = java.util.UUID.randomUUID().toString();
- return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
- new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
- new CreateParentParam(false), new BufferSizeParam(bufferSize),
- new ReplicationParam(replication), new LeaseParam(leaseId),
- new BlockSizeParam(blockSize),
- new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
- };
- }
-
- /**
- * Since defined as private in parent class, redefined to pass through
- * Create api implementation.
- *
- * @param permission
- * @return FsPermission list
- */
- private FsPermission applyUMask(FsPermission permission) {
- FsPermission fsPermission = permission;
- if (fsPermission == null) {
- fsPermission = FsPermission.getDefault();
- }
- return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
- }
-
- /**
- * Open call semantic is handled differently in case of ADL. Instead of
- * network stream is returned to the user,
- * Overridden FsInputStream is returned.
- *
- * 1. No dedicated connection to server.
- * 2. Process level concurrent read ahead Buffering is done, This allows
- * data to be available for caller quickly.
- * 3. Number of byte to read ahead is configurable.
- *
- * Advantage of Process level concurrent read ahead Buffering semantics is
- * 1. ADL backend server does not allow idle connection for longer duration
- * . In case of slow reader scenario,
- * observed connection timeout/Connection reset causing occasional job
- * failures.
- * 2. Performance boost to jobs which are slow reader, avoided network latency
- * 3. Compressed format support like ORC, and large data files gains the
- * most out of this implementation.
- *
- * Read ahead feature is configurable.
- *
- * @param f File path
- * @param buffersize Buffer size
- * @return FSDataInputStream InputStream on which application can read
- * stream of bytes
- * @throws IOException when system error, internal server error or user error
- */
- @Override
- public FSDataInputStream open(final Path f, final int buffersize)
- throws IOException {
- statistics.incrementReadOps(1);
-
- final HttpOpParam.Op op = GetOpParam.Op.OPEN;
- // use a runner so the open can recover from an invalid token
- FsPathConnectionRunner runner = null;
-
- if (featureConcurrentReadWithReadAhead) {
- URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
- new ReadADLNoRedirectParam(true),
- new ADLVersionInfo(VersionInfo.getVersion()));
-
- BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
- maxBufferSize, maxConcurrentConnection);
-
- FSDataInputStream fin = new FSDataInputStream(bb);
- return fin;
- } else {
- if (featureRedirectOff) {
- runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
- new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
- new ADLVersionInfo(VersionInfo.getVersion()));
- } else {
- runner = new FsPathConnectionRunner(op, f,
- new BufferSizeParam(buffersize));
- }
-
- return new FSDataInputStream(
- new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
- new OffsetUrlOpener(null)));
- }
- }
-
- /**
- * @param f File/Folder path
- * @return FileStatus array list
- * @throws IOException For system error
- */
- @Override
- public FileStatus[] listStatus(final Path f) throws IOException {
- FileStatus[] fileStatuses = super.listStatus(f);
- for (int i = 0; i < fileStatuses.length; i++) {
- if (overrideOwner) {
- fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
- fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
- fileStatuses[i].getBlockSize(),
- fileStatuses[i].getModificationTime(),
- fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
- userName, "hdfs", fileStatuses[i].getPath());
- }
- }
- return fileStatuses;
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(final FileStatus status,
- final long offset, final long length) throws IOException {
- if (status == null) {
- return null;
- }
-
- if (featureGetBlockLocationLocallyBundled) {
- if ((offset < 0) || (length < 0)) {
- throw new IllegalArgumentException("Invalid start or len parameter");
- }
-
- if (status.getLen() < offset) {
- return new BlockLocation[0];
- }
-
- final String[] name = {"localhost"};
- final String[] host = {"localhost"};
- long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be
- // non zero
- int numberOfLocations =
- (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
- BlockLocation[] locations = new BlockLocation[numberOfLocations];
- for (int i = 0; i < locations.length; i++) {
- long currentOffset = offset + (i * blockSize);
- long currentLength = Math
- .min(blockSize, offset + length - currentOffset);
- locations[i] = new BlockLocation(name, host, currentOffset,
- currentLength);
- }
-
- return locations;
- } else {
- return getFileBlockLocations(status.getPath(), offset, length);
- }
- }
-
- @Override
- public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
- final long length) throws IOException {
- statistics.incrementReadOps(1);
-
- if (featureGetBlockLocationLocallyBundled) {
- FileStatus fileStatus = getFileStatus(p);
- return getFileBlockLocations(fileStatus, offset, length);
- } else {
- return super.getFileBlockLocations(p, offset, length);
- }
- }
-
- enum StreamState {
- Initial,
- DataCachedInLocalBuffer,
- StreamEnd
- }
-
- class BatchAppendOutputStream extends OutputStream {
- private Path fsPath;
- private Param<?, ?>[] parameters;
- private byte[] data = null;
- private int offset = 0;
- private long length = 0;
- private boolean eof = false;
- private boolean hadError = false;
- private byte[] dataBuffers = null;
- private int bufSize = 0;
- private boolean streamClosed = false;
-
- public BatchAppendOutputStream(Path path, int bufferSize,
- Param<?, ?>... param) throws IOException {
- if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
- bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
- } else {
- bufSize = bufferSize;
- }
-
- this.fsPath = path;
- this.parameters = param;
- this.data = getBuffer();
- FSDataOutputStream createStream = null;
- try {
- if (featureRedirectOff) {
- CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
- true);
- Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
- new Param<?, ?>[param.length + 2] :
- new Param<?, ?>[param.length + 1];
- System.arraycopy(param, 0, tmpParam, 0, param.length);
- tmpParam[param.length] = skipRedirect;
- if (featureFlushWhenEOF) {
- tmpParam[param.length + 1] = new ADLFlush(false);
- }
- createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
- fsPath, 1, tmpParam).run();
- } else {
- createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
- fsPath, 1, param).run();
- }
- } finally {
- if (createStream != null) {
- createStream.close();
- }
- }
- }
-
- @Override
- public final synchronized void write(int b) throws IOException {
- if (streamClosed) {
- throw new IOException(fsPath + " stream object is closed.");
- }
-
- if (offset == (data.length)) {
- flush();
- }
-
- data[offset] = (byte) b;
- offset++;
-
- // Statistics will get incremented again as part of the batch updates,
- // decrement here to avoid double value
- if (statistics != null) {
- statistics.incrementBytesWritten(-1);
- }
- }
-
- @Override
- public final synchronized void write(byte[] buf, int off, int len)
- throws IOException {
- if (streamClosed) {
- throw new IOException(fsPath + " stream object is closed.");
- }
-
- int bytesToWrite = len;
- int localOff = off;
- int localLen = len;
- if (localLen >= data.length) {
- // Flush data that is already in our internal buffer
- flush();
-
- // Keep committing data until we have less than our internal buffers
- // length left
- do {
- try {
- commit(buf, localOff, data.length, eof);
- } catch (IOException e) {
- hadError = true;
- throw e;
- }
- localOff += data.length;
- localLen -= data.length;
- } while (localLen >= data.length);
- }
-
- // At this point, we have less than data.length left to copy from users
- // buffer
- if (offset + localLen >= data.length) {
- // Users buffer has enough data left to fill our internal buffer
- int bytesToCopy = data.length - offset;
- System.arraycopy(buf, localOff, data, offset, bytesToCopy);
- offset += bytesToCopy;
-
- // Flush our internal buffer
- flush();
- localOff += bytesToCopy;
- localLen -= bytesToCopy;
- }
-
- if (localLen > 0) {
- // Simply copy the remainder from the users buffer into our internal
- // buffer
- System.arraycopy(buf, localOff, data, offset, localLen);
- offset += localLen;
- }
-
- // Statistics will get incremented again as part of the batch updates,
- // decrement here to avoid double value
- if (statistics != null) {
- statistics.incrementBytesWritten(-bytesToWrite);
- }
- }
-
- @Override
- public final synchronized void flush() throws IOException {
- if (streamClosed) {
- throw new IOException(fsPath + " stream object is closed.");
- }
-
- if (offset > 0) {
- try {
- commit(data, 0, offset, eof);
- } catch (IOException e) {
- hadError = true;
- throw e;
- }
- }
-
- offset = 0;
- }
-
- @Override
- public final synchronized void close() throws IOException {
- // Stream is closed earlier, return quietly.
- if(streamClosed) {
- return;
- }
-
- if (featureRedirectOff) {
- eof = true;
- }
-
- boolean flushedSomething = false;
- if (hadError) {
- // No point proceeding further since the error has occurred and
- // stream would be required to upload again.
- streamClosed = true;
- return;
- } else {
- flushedSomething = offset > 0;
- try {
- flush();
- } finally {
- streamClosed = true;
- }
- }
-
- if (featureRedirectOff) {
- // If we didn't flush anything from our internal buffer, we have to
- // call the service again
- // with an empty payload and flush=true in the url
- if (!flushedSomething) {
- try {
- commit(null, 0, ADLConfKeys.KB, true);
- } finally {
- streamClosed = true;
- }
- }
- }
- }
-
- private void commit(byte[] buffer, int off, int len, boolean endOfFile)
- throws IOException {
- OutputStream out = null;
- try {
- if (featureRedirectOff) {
- AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
- true);
- Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
- new Param<?, ?>[parameters.length + 3] :
- new Param<?, ?>[parameters.length + 1];
- System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
- tmpParam[parameters.length] = skipRedirect;
- if (featureFlushWhenEOF) {
- tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
- tmpParam[parameters.length + 2] = new OffsetParam(length);
- }
-
- out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
- len, tmpParam).run();
- } else {
- out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
- len, parameters).run();
- }
-
- if (buffer != null) {
- out.write(buffer, off, len);
- length += len;
- }
- } finally {
- if (out != null) {
- out.close();
- }
- }
- }
-
- private byte[] getBuffer() {
- // Switch between the first and second buffer
- dataBuffers = new byte[bufSize];
- return dataBuffers;
- }
- }
-
- /**
- * Read data from backend in chunks instead of persistent connection. This
- * is to avoid slow reader causing socket
- * timeout.
- */
- protected class BatchByteArrayInputStream extends FSInputStream {
-
- private static final int SIZE4MB = 4 * 1024 * 1024;
- private final URL runner;
- private byte[] data = null;
- private long validDataHoldingSize = 0;
- private int bufferOffset = 0;
- private long currentFileOffset = 0;
- private long nextFileOffset = 0;
- private long fileSize = 0;
- private StreamState state = StreamState.Initial;
- private int maxBufferSize;
- private int maxConcurrentConnection;
- private Path fsPath;
- private boolean streamIsClosed;
- private Future[] subtasks = null;
-
- BatchByteArrayInputStream(URL url, Path p, int bufferSize,
- int concurrentConnection) throws IOException {
- this.runner = url;
- fsPath = p;
- FileStatus fStatus = getFileStatus(fsPath);
- if (!fStatus.isFile()) {
- throw new IOException("Cannot open the directory " + p + " for " +
- "reading");
- }
- fileSize = fStatus.getLen();
- this.maxBufferSize = bufferSize;
- this.maxConcurrentConnection = concurrentConnection;
- this.streamIsClosed = false;
- }
-
- @Override
- public synchronized final int read(long position, byte[] buffer, int offset,
- int length) throws IOException {
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
- long oldPos = this.getPos();
-
- int nread1;
- try {
- this.seek(position);
- nread1 = this.read(buffer, offset, length);
- } finally {
- this.seek(oldPos);
- }
-
- return nread1;
- }
-
- @Override
- public synchronized final int read() throws IOException {
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
- int status = doBufferAvailabilityCheck();
- if (status == -1) {
- return status;
- }
- int ch = data[bufferOffset++] & (0xff);
- if (statistics != null) {
- statistics.incrementBytesRead(1);
- }
- return ch;
- }
-
- @Override
- public synchronized final void readFully(long position, byte[] buffer,
- int offset, int length) throws IOException {
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
-
- super.readFully(position, buffer, offset, length);
- if (statistics != null) {
- statistics.incrementBytesRead(length);
- }
- }
-
- @Override
- public synchronized final int read(byte[] b, int off, int len)
- throws IOException {
- if (b == null) {
- throw new IllegalArgumentException();
- } else if (off < 0 || len < 0 || len > b.length - off) {
- throw new IndexOutOfBoundsException();
- } else if (len == 0) {
- return 0;
- }
-
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
- int status = doBufferAvailabilityCheck();
- if (status == -1) {
- return status;
- }
-
- int byteRead = 0;
- long availableBytes = validDataHoldingSize - off;
- long requestedBytes = bufferOffset + len - off;
- if (requestedBytes <= availableBytes) {
- System.arraycopy(data, bufferOffset, b, off, len);
- bufferOffset += len;
- byteRead = len;
- } else {
- byteRead = super.read(b, off, len);
- }
-
- if (statistics != null) {
- statistics.incrementBytesRead(byteRead);
- }
-
- return byteRead;
- }
-
- private int doBufferAvailabilityCheck() throws IOException {
- if (state == StreamState.Initial) {
- validDataHoldingSize = fill(nextFileOffset);
- }
-
- long dataReloadSize = 0;
- switch ((int) validDataHoldingSize) {
- case -1:
- state = StreamState.StreamEnd;
- return -1;
- case 0:
- dataReloadSize = fill(nextFileOffset);
- if (dataReloadSize <= 0) {
- state = StreamState.StreamEnd;
- return (int) dataReloadSize;
- } else {
- validDataHoldingSize = dataReloadSize;
- }
- break;
- default:
- break;
- }
-
- if (bufferOffset >= validDataHoldingSize) {
- dataReloadSize = fill(nextFileOffset);
- }
-
- if (bufferOffset >= ((dataReloadSize == 0) ?
- validDataHoldingSize :
- dataReloadSize)) {
- state = StreamState.StreamEnd;
- return -1;
- }
-
- validDataHoldingSize = ((dataReloadSize == 0) ?
- validDataHoldingSize :
- dataReloadSize);
- state = StreamState.DataCachedInLocalBuffer;
- return 0;
- }
-
- private long fill(final long off) throws IOException {
- if (state == StreamState.StreamEnd) {
- return -1;
- }
-
- if (fileSize <= off) {
- state = StreamState.StreamEnd;
- return -1;
- }
- int len = maxBufferSize;
- long fileOffset = 0;
- boolean isEntireFileCached = true;
- if ((fileSize <= maxBufferSize)) {
- len = (int) fileSize;
- currentFileOffset = 0;
- nextFileOffset = 0;
- } else {
- if (len > (fileSize - off)) {
- len = (int) (fileSize - off);
- }
-
- synchronized (BufferManager.getLock()) {
- if (BufferManager.getInstance()
- .hasValidDataForOffset(fsPath.toString(), off)) {
- len = (int) (
- BufferManager.getInstance().getBufferOffset() + BufferManager
- .getInstance().getBufferSize() - (int) off);
- }
- }
-
- if (len <= 0) {
- len = maxBufferSize;
- }
- fileOffset = off;
- isEntireFileCached = false;
- }
-
- data = null;
- BufferManager bm = BufferManager.getInstance();
- data = bm.getEmpty(len);
- boolean fetchDataOverNetwork = false;
- synchronized (BufferManager.getLock()) {
- if (bm.hasData(fsPath.toString(), fileOffset, len)) {
- try {
- bm.get(data, fileOffset);
- validDataHoldingSize = data.length;
- currentFileOffset = fileOffset;
- } catch (ArrayIndexOutOfBoundsException e) {
- fetchDataOverNetwork = true;
- }
- } else {
- fetchDataOverNetwork = true;
- }
- }
-
- if (fetchDataOverNetwork) {
- int splitSize = getSplitSize(len);
- try {
- validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
- splitSize);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException("Interrupted filling buffer", e);
- }
-
- synchronized (BufferManager.getLock()) {
- bm.add(data, fsPath.toString(), fileOffset);
- }
- currentFileOffset = nextFileOffset;
- }
-
- nextFileOffset += validDataHoldingSize;
- state = StreamState.DataCachedInLocalBuffer;
- bufferOffset = isEntireFileCached ? (int) off : 0;
- return validDataHoldingSize;
- }
-
- int getSplitSize(int size) {
- if (size <= SIZE4MB) {
- return 1;
- }
-
- // Not practical
- if (size > maxBufferSize) {
- size = maxBufferSize;
- }
-
- int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1);
- int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
- return splitSize;
- }
-
- @Override
- public synchronized final void seek(long pos) throws IOException {
- if (pos == -1) {
- throw new IOException("Bad offset, cannot seek to " + pos);
- }
-
- BufferManager bm = BufferManager.getInstance();
- synchronized (BufferManager.getLock()) {
- if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
- state = StreamState.DataCachedInLocalBuffer;
- } else if (pos >= 0) {
- state = StreamState.Initial;
- }
- }
-
- long availableBytes = (currentFileOffset + validDataHoldingSize);
-
- // Check if this position falls under buffered data
- if (pos < currentFileOffset || availableBytes <= 0) {
- validDataHoldingSize = 0;
- currentFileOffset = pos;
- nextFileOffset = pos;
- bufferOffset = 0;
- return;
- }
-
- if (pos < availableBytes && pos >= currentFileOffset) {
- state = StreamState.DataCachedInLocalBuffer;
- bufferOffset = (int) (pos - currentFileOffset);
- } else {
- validDataHoldingSize = 0;
- currentFileOffset = pos;
- nextFileOffset = pos;
- bufferOffset = 0;
- }
- }
-
- @Override
- public synchronized final long getPos() throws IOException {
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
- return bufferOffset + currentFileOffset;
- }
-
- @Override
- public synchronized final int available() throws IOException {
- if (streamIsClosed) {
- throw new IOException("Stream already closed");
- }
- return Integer.MAX_VALUE;
- }
-
- @Override
- public final boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
-
- @SuppressWarnings("unchecked")
- private int fillDataConcurrently(byte[] byteArray, int length,
- long globalOffset, int splitSize)
- throws IOException, InterruptedException {
- ExecutorService executor = Executors.newFixedThreadPool(splitSize);
- subtasks = new Future[splitSize];
- for (int i = 0; i < splitSize; i++) {
- int offset = i * (length / splitSize);
- int splitLength = (splitSize == (i + 1)) ?
- (length / splitSize) + (length % splitSize) :
- (length / splitSize);
- subtasks[i] = executor.submit(
- new BackgroundReadThread(byteArray, offset, splitLength,
- globalOffset + offset));
- }
-
- executor.shutdown();
- // wait until all tasks are finished
- executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
- TimeUnit.SECONDS);
-
- int totalBytePainted = 0;
- for (int i = 0; i < splitSize; ++i) {
- try {
- totalBytePainted += (Integer) subtasks[i].get();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e.getCause());
- } catch (ExecutionException e) {
- Thread.currentThread().interrupt();
- throw new IOException(e.getCause());
- }
- }
-
- if (totalBytePainted != length) {
- throw new IOException("Expected " + length + " bytes, Got " +
- totalBytePainted + " bytes");
- }
-
- return totalBytePainted;
- }
-
- @Override
- public synchronized final void close() throws IOException {
- synchronized (BufferManager.getLock()) {
- BufferManager.getInstance().clear();
- }
- //need to cleanup the above code the stream and connection close doesn't
- // happen here
- //flag set to mark close happened, cannot use the stream once closed
- streamIsClosed = true;
- }
-
- /**
- * Reads data from the ADL backend from the specified global offset and
- * given
- * length. Read data from ADL backend is copied to buffer array from the
- * offset value specified.
- *
- * @param buffer Store read data from ADL backend in the buffer.
- * @param offset Store read data from ADL backend in the buffer
- * from the
- * offset.
- * @param length Size of the data read from the ADL backend.
- * @param globalOffset Read data from file offset.
- * @return Number of bytes read from the ADL backend
- * @throws IOException For any intermittent server issues or internal
- * failures.
- */
- private int fillUpData(byte[] buffer, int offset, int length,
- long globalOffset) throws IOException {
- int totalBytesRead = 0;
- final URL offsetUrl = new URL(
- runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
- + new LengthParam(String.valueOf(length)));
- HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
- true).run();
- InputStream in = conn.getInputStream();
- try {
- int bytesRead = 0;
- while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
- (int) (length - totalBytesRead))) > 0) {
- totalBytesRead += bytesRead;
- }
-
- // InputStream must be fully consumed to enable http keep-alive
- if (bytesRead == 0) {
- // Looking for EOF marker byte needs to be read.
- if (in.read() != -1) {
- throw new SocketException(
- "Server returned more than requested data.");
- }
- }
- } finally {
- in.close();
- conn.disconnect();
- }
-
- return totalBytesRead;
- }
-
- private class BackgroundReadThread implements Callable {
-
- private final byte[] data;
- private int offset;
- private int length;
- private long globalOffset;
-
- BackgroundReadThread(byte[] buffer, int off, int size, long position) {
- this.data = buffer;
- this.offset = off;
- this.length = size;
- this.globalOffset = position;
- }
-
- public Object call() throws IOException {
- return fillUpData(data, offset, length, globalOffset);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
deleted file mode 100644
index d7dce25..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web.oauth2;
-
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * Exposing AccessTokenProvider publicly to extend in com.microsoft.azure
- * .datalake package. Extended version to cache
- * token for the process to gain performance gain.
- */
-@Private
-@Unstable
-public abstract class PrivateCachedRefreshTokenBasedAccessTokenProvider
- extends AccessTokenProvider {
-
- // visibility workaround
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
deleted file mode 100644
index 7a9dffa..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/**
- * A distributed implementation of {@link
- * org.apache.hadoop.hdfs.web.oauth2} for oauth2 token management support.
- */
-package org.apache.hadoop.hdfs.web.oauth2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
deleted file mode 100644
index 1cc8273..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-/**
- * A distributed implementation of {@link org.apache.hadoop.hdfs.web} for
- * reading and writing files on Azure data lake file system. This
- * implementation is derivation from the webhdfs specification.
- */
-package org.apache.hadoop.hdfs.web;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
deleted file mode 100644
index b76aaaa..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web.resources;
-
-/**
- * Query parameter to notify backend server that the all the data has been
- * pushed to over the stream.
- *
- * Used in operation code Create and Append.
- */
-public class ADLFlush extends BooleanParam {
- /**
- * Parameter name.
- */
- public static final String NAME = "flush";
-
- private static final Domain DOMAIN = new Domain(NAME);
-
- /**
- * Constructor.
- *
- * @param value the parameter value.
- */
- public ADLFlush(final Boolean value) {
- super(DOMAIN, value);
- }
-
- @Override
- public final String getName() {
- return NAME;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19259422/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
deleted file mode 100644
index 6b3708f..0000000
--- a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.hadoop.hdfs.web.resources;
-
-import java.net.HttpURLConnection;
-
-/**
- * Extended Webhdfs GetOpParam to avoid redirect operation for azure data
- * lake storage.
- */
-public class ADLGetOpParam extends HttpOpParam<ADLGetOpParam.Op> {
- private static final Domain<Op> DOMAIN = new Domain<Op>(NAME, Op.class);
-
- /**
- * Constructor.
- *
- * @param str a string representation of the parameter value.
- */
- public ADLGetOpParam(final String str) {
- super(DOMAIN, DOMAIN.parse(str));
- }
-
- @Override
- public final String getName() {
- return NAME;
- }
-
- /**
- * Get operations.
- */
- public static enum Op implements HttpOpParam.Op {
- OPEN(false, HttpURLConnection.HTTP_OK);
-
- private final boolean redirect;
- private final int expectedHttpResponseCode;
- private final boolean requireAuth;
-
- Op(final boolean doRedirect, final int expectHttpResponseCode) {
- this(doRedirect, expectHttpResponseCode, false);
- }
-
- Op(final boolean doRedirect, final int expectHttpResponseCode,
- final boolean doRequireAuth) {
- this.redirect = doRedirect;
- this.expectedHttpResponseCode = expectHttpResponseCode;
- this.requireAuth = doRequireAuth;
- }
-
- @Override
- public HttpOpParam.Type getType() {
- return HttpOpParam.Type.GET;
- }
-
- @Override
- public boolean getRequireAuth() {
- return requireAuth;
- }
-
- @Override
- public boolean getDoOutput() {
- return false;
- }
-
- @Override
- public boolean getRedirect() {
- return redirect;
- }
-
- @Override
- public int getExpectedHttpResponseCode() {
- return expectedHttpResponseCode;
- }
-
- @Override
- public String toQueryString() {
- return NAME + "=" + this;
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org