You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/06/10 03:36:00 UTC
[49/50] [abbrv] hadoop git commit: HADOOP-12666. Support Microsoft
Azure Data Lake - as a file system in Hadoop. Contributed by Vishwajeet
Dusane.
HADOOP-12666. Support Microsoft Azure Data Lake - as a file system in Hadoop. Contributed by Vishwajeet Dusane.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9581fb71
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9581fb71
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9581fb71
Branch: refs/heads/HDFS-7240
Commit: 9581fb715cbc8a6ad28566e83c6d0242a7306688
Parents: e383b73
Author: Chris Nauroth <cn...@apache.org>
Authored: Thu Jun 9 14:33:31 2016 -0700
Committer: Chris Nauroth <cn...@apache.org>
Committed: Thu Jun 9 14:33:31 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/core-default.xml | 60 +
.../conf/TestCommonConfigurationFields.java | 6 +
hadoop-project/src/site/site.xml | 2 +
.../dev-support/findbugs-exclude.xml | 24 +
hadoop-tools/hadoop-azure-datalake/pom.xml | 180 +++
.../main/java/org/apache/hadoop/fs/adl/Adl.java | 52 +
.../org/apache/hadoop/fs/adl/AdlFileSystem.java | 41 +
...hedRefreshTokenBasedAccessTokenProvider.java | 135 +++
.../hadoop/fs/adl/oauth2/package-info.java | 23 +
.../org/apache/hadoop/fs/adl/package-info.java | 23 +
.../org/apache/hadoop/hdfs/web/ADLConfKeys.java | 61 +
.../apache/hadoop/hdfs/web/BufferManager.java | 180 +++
.../web/PrivateAzureDataLakeFileSystem.java | 1108 ++++++++++++++++++
...hedRefreshTokenBasedAccessTokenProvider.java | 37 +
.../hadoop/hdfs/web/oauth2/package-info.java | 24 +
.../apache/hadoop/hdfs/web/package-info.java | 25 +
.../hadoop/hdfs/web/resources/ADLFlush.java | 49 +
.../hdfs/web/resources/ADLGetOpParam.java | 96 ++
.../hdfs/web/resources/ADLPostOpParam.java | 97 ++
.../hdfs/web/resources/ADLPutOpParam.java | 94 ++
.../hdfs/web/resources/ADLVersionInfo.java | 51 +
.../web/resources/AppendADLNoRedirectParam.java | 45 +
.../web/resources/CreateADLNoRedirectParam.java | 44 +
.../hadoop/hdfs/web/resources/LeaseParam.java | 53 +
.../web/resources/ReadADLNoRedirectParam.java | 44 +
.../hadoop/hdfs/web/resources/package-info.java | 27 +
.../src/site/markdown/index.md | 219 ++++
...hedRefreshTokenBasedAccessTokenProvider.java | 147 +++
hadoop-tools/hadoop-tools-dist/pom.xml | 6 +
hadoop-tools/pom.xml | 1 +
30 files changed, 2954 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 39b7132..f1d77dd 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2213,4 +2213,64 @@
needs to be specified in net.topology.script.file.name.
</description>
</property>
+
+
+ <!-- Azure Data Lake File System Configurations -->
+
+ <property>
+ <name>adl.feature.override.readahead</name>
+ <value>true</value>
+ <description>
+ Enables read aheads in the ADL client, the feature is used to
+ improve read throughput.
+ This works in conjunction with the value set in
+ adl.feature.override.readahead.max.buffersize.
+ When set to false the read ahead feature is turned off.
+ Default : True if not configured.
+ </description>
+ </property>
+
+ <property>
+ <name>adl.feature.override.readahead.max.buffersize</name>
+ <value>8388608</value>
+ <description>
+ Define maximum buffer size to cache read ahead data, this is
+ allocated per process to
+ cache read ahead data. Applicable only when
+ adl.feature.override.readahead is set to true.
+ Default : 8388608 Byte i.e. 8MB if not configured.
+ </description>
+ </property>
+
+ <property>
+ <name>adl.feature.override.readahead.max.concurrent.connection</name>
+ <value>2</value>
+ <description>
+ Define maximum concurrent connection can be established to
+ read ahead. If the data size is less than 4MB then only 1 read n/w
+ connection
+ is set. If the data size is less than 4MB but less than 8MB then 2 read
+ n/w connection
+ is set. Data greater than 8MB then value set under the property would
+ take
+ effect. Applicable only when adl.feature.override.readahead is set
+ to true and buffer size is greater than 8MB.
+ It is recommended to reset this property if the
+ adl.feature.override.readahead.max.buffersize
+ is less than 8MB to gain performance. Application has to consider
+ throttling limit for the account as well before configuring large
+ buffer size.
+ </description>
+ </property>
+
+ <property>
+ <name>fs.adl.impl</name>
+ <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
+ </property>
+
+ <property>
+ <name>fs.AbstractFileSystem.adl.impl</name>
+ <value>org.apache.hadoop.fs.adl.Adl</value>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 90f7514..020474f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -102,6 +102,12 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
xmlPrefixToSkipCompare.add("s3.");
xmlPrefixToSkipCompare.add("s3native.");
+ // ADL properties are in a different subtree
+ // - org.apache.hadoop.hdfs.web.ADLConfKeys
+ xmlPrefixToSkipCompare.add("adl.");
+ xmlPropsToSkipCompare.add("fs.adl.impl");
+ xmlPropsToSkipCompare.add("fs.AbstractFileSystem.adl.impl");
+
// Deprecated properties. These should eventually be removed from the
// class.
configurationPropsToSkipCompare
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-project/src/site/site.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/src/site/site.xml b/hadoop-project/src/site/site.xml
index f9f4726..a89a220 100644
--- a/hadoop-project/src/site/site.xml
+++ b/hadoop-project/src/site/site.xml
@@ -146,6 +146,8 @@
<menu name="Hadoop Compatible File Systems" inherit="top">
<item name="Amazon S3" href="hadoop-aws/tools/hadoop-aws/index.html"/>
<item name="Azure Blob Storage" href="hadoop-azure/index.html"/>
+ <item name="Azure Data Lake Storage"
+ href="hadoop-azure-datalake/index.html"/>
<item name="OpenStack Swift" href="hadoop-openstack/index.html"/>
</menu>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
new file mode 100644
index 0000000..4fd36ef
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/dev-support/findbugs-exclude.xml
@@ -0,0 +1,24 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<FindBugsFilter>
+ <!-- Buffer object is accessed withing trusted code and intentionally assigned instead of array copy -->
+ <Match>
+ <Class name="org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem$BatchAppendOutputStream$CommitTask"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ <Priority value="2"/>
+ </Match>
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml
new file mode 100644
index 0000000..a4b1fe1
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/pom.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-project</artifactId>
+ <version>3.0.0-alpha1-SNAPSHOT</version>
+ <relativePath>../../hadoop-project</relativePath>
+ </parent>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-azure-datalake</artifactId>
+ <name>Apache Hadoop Azure Data Lake support</name>
+ <description>
+ This module contains code to support integration with Azure Data Lake.
+ </description>
+ <packaging>jar</packaging>
+ <properties>
+ <okHttpVersion>2.4.0</okHttpVersion>
+ <minimalJsonVersion>0.9.1</minimalJsonVersion>
+ <file.encoding>UTF-8</file.encoding>
+ <downloadSources>true</downloadSources>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <findbugsXmlOutput>true</findbugsXmlOutput>
+ <xmlOutput>true</xmlOutput>
+ <excludeFilterFile>
+ ${basedir}/dev-support/findbugs-exclude.xml
+ </excludeFilterFile>
+ <effort>Max</effort>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-project-info-reports-plugin</artifactId>
+
+ <configuration>
+ <dependencyDetailsEnabled>false</dependencyDetailsEnabled>
+ <dependencyLocationsEnabled>false
+ </dependencyLocationsEnabled>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>deplist</id>
+ <phase>compile</phase>
+ <goals>
+ <goal>list</goal>
+ </goals>
+ <configuration>
+ <!-- build a shellprofile -->
+ <outputFile>${project.basedir}/target/hadoop-tools-deps/${project.artifactId}.tools-optional.txt</outputFile>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+
+
+ <!--
+ The following is to suppress a m2e warning in eclipse
+ (m2e doesn't know how to handle maven-enforcer:enforce, so we have to tell m2e to ignore it)
+ see: http://stackoverflow.com/questions/13040788/how-to-elimate-the-maven-enforcer-plugin-goal-enforce-is-ignored-by-m2e-wa
+ -->
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.m2e</groupId>
+ <artifactId>lifecycle-mapping</artifactId>
+ <version>1.0.0</version>
+ <configuration>
+ <lifecycleMappingMetadata>
+ <pluginExecutions>
+ <pluginExecution>
+ <pluginExecutionFilter>
+ <groupId>org.apache.maven.plugins
+ </groupId>
+ <artifactId>maven-enforcer-plugin
+ </artifactId>
+ <versionRange>[1.0.0,)</versionRange>
+ <goals>
+ <goal>enforce</goal>
+ </goals>
+ </pluginExecutionFilter>
+ <action>
+ <ignore/>
+ </action>
+ </pluginExecution>
+ </pluginExecutions>
+ </lifecycleMappingMetadata>
+ </configuration>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <artifactId>servlet-api</artifactId>
+ <groupId>javax.servlet</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.eclipsesource.minimal-json</groupId>
+ <artifactId>minimal-json</artifactId>
+ <version>0.9.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp</groupId>
+ <artifactId>mockwebserver</artifactId>
+ <version>2.4.0</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
new file mode 100644
index 0000000..4642d6b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/Adl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegateToFileSystem;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Expose adl:// scheme to access ADL file system.
+ */
+public class Adl extends DelegateToFileSystem {
+
+ Adl(URI theUri, Configuration conf) throws IOException, URISyntaxException {
+ super(theUri, createDataLakeFileSystem(conf), conf, AdlFileSystem.SCHEME,
+ false);
+ }
+
+ private static AdlFileSystem createDataLakeFileSystem(Configuration conf) {
+ AdlFileSystem fs = new AdlFileSystem();
+ fs.setConf(conf);
+ return fs;
+ }
+
+ /**
+ * @return Default port for ADL File system to communicate
+ */
+ @Override
+ public final int getUriDefaultPort() {
+ return AdlFileSystem.DEFAULT_PORT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
new file mode 100644
index 0000000..11e1e0b
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/AdlFileSystem.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl;
+
+import org.apache.hadoop.hdfs.web.PrivateAzureDataLakeFileSystem;
+
+/**
+ * Expose adl:// scheme to access ADL file system.
+ */
+public class AdlFileSystem extends PrivateAzureDataLakeFileSystem {
+
+ public static final String SCHEME = "adl";
+ public static final int DEFAULT_PORT = 443;
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ @Override
+ public int getDefaultPort() {
+ return DEFAULT_PORT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000..b7f3b00
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/CachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl.oauth2;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.LinkedHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.web.oauth2.AccessTokenProvider;
+import org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider;
+import org.apache.hadoop.hdfs.web.oauth2.PrivateCachedRefreshTokenBasedAccessTokenProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_CLIENT_ID_KEY;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.OAUTH_REFRESH_URL_KEY;
+import static org.apache.hadoop.hdfs.web.oauth2.ConfRefreshTokenBasedAccessTokenProvider.OAUTH_REFRESH_TOKEN_KEY;
+
+/**
+ * Share refresh tokens across all ADLS instances with a common client ID. The
+ * {@link AccessTokenProvider} can be shared across multiple instances,
+ * amortizing the cost of refreshing tokens.
+ */
+public class CachedRefreshTokenBasedAccessTokenProvider
+ extends PrivateCachedRefreshTokenBasedAccessTokenProvider {
+
+ public static final String FORCE_REFRESH = "adl.force.token.refresh";
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CachedRefreshTokenBasedAccessTokenProvider.class);
+
+ /** Limit size of provider cache. */
+ static final int MAX_PROVIDERS = 10;
+ @SuppressWarnings("serial")
+ private static final Map<String, AccessTokenProvider> CACHE =
+ new LinkedHashMap<String, AccessTokenProvider>() {
+ @Override
+ public boolean removeEldestEntry(
+ Map.Entry<String, AccessTokenProvider> e) {
+ return size() > MAX_PROVIDERS;
+ }
+ };
+
+ private AccessTokenProvider instance = null;
+
+ /**
+ * Create handle for cached instance.
+ */
+ public CachedRefreshTokenBasedAccessTokenProvider() {
+ }
+
+ /**
+ * Gets the access token from internally cached
+ * ConfRefreshTokenBasedAccessTokenProvider instance.
+ *
+ * @return Valid OAuth2 access token for the user.
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public synchronized String getAccessToken() throws IOException {
+ return instance.getAccessToken();
+ }
+
+ /**
+ * @return A cached Configuration consistent with the parameters of this
+ * instance.
+ */
+ @Override
+ public synchronized Configuration getConf() {
+ return instance.getConf();
+ }
+
+ /**
+ * Configure cached instance. Note that the Configuration instance returned
+ * from subsequent calls to {@link #getConf() getConf} may be from a
+ * previous, cached entry.
+ * @param conf Configuration instance
+ */
+ @Override
+ public synchronized void setConf(Configuration conf) {
+ String id = conf.get(OAUTH_CLIENT_ID_KEY);
+ if (null == id) {
+ throw new IllegalArgumentException("Missing client ID");
+ }
+ synchronized (CACHE) {
+ instance = CACHE.get(id);
+ if (null == instance
+ || conf.getBoolean(FORCE_REFRESH, false)
+ || replace(instance, conf)) {
+ instance = newInstance();
+ // clone configuration
+ instance.setConf(new Configuration(conf));
+ CACHE.put(id, instance);
+ LOG.debug("Created new client {}", id);
+ }
+ }
+ }
+
+ AccessTokenProvider newInstance() {
+ return new ConfRefreshTokenBasedAccessTokenProvider();
+ }
+
+ private static boolean replace(AccessTokenProvider cached, Configuration c2) {
+ // ConfRefreshTokenBasedAccessTokenProvider::setConf asserts !null
+ final Configuration c1 = cached.getConf();
+ for (String key : new String[] {
+ OAUTH_REFRESH_TOKEN_KEY, OAUTH_REFRESH_URL_KEY }) {
+ if (!c1.get(key).equals(c2.get(key))) {
+ // replace cached instance for this clientID
+ return true;
+ }
+ }
+ return false;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
new file mode 100644
index 0000000..b444984
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/oauth2/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * public interface to expose OAuth2 authentication related features.
+ */
+package org.apache.hadoop.fs.adl.oauth2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
new file mode 100644
index 0000000..98e6a77
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/fs/adl/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Supporting classes for metrics instrumentation.
+ */
+package org.apache.hadoop.fs.adl;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
new file mode 100644
index 0000000..a7f932f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/ADLConfKeys.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Constants.
+ */
+public final class ADLConfKeys {
+ public static final String
+ ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN =
+ "adl.feature.override.readahead.max.concurrent.connection";
+ public static final int
+ ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT = 2;
+ public static final String ADL_WEBSDK_VERSION_KEY = "ADLFeatureSet";
+ static final String ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER =
+ "adl.debug.override.localuserasfileowner";
+ static final boolean ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT = false;
+ static final String ADL_FEATURE_REDIRECT_OFF =
+ "adl.feature.override.redirection.off";
+ static final boolean ADL_FEATURE_REDIRECT_OFF_DEFAULT = true;
+ static final String ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED =
+ "adl.feature.override.getblocklocation.locally.bundled";
+ static final boolean ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT
+ = true;
+ static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD =
+ "adl.feature.override.readahead";
+ static final boolean ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT =
+ true;
+ static final String ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE =
+ "adl.feature.override.readahead.max.buffersize";
+
+ static final int KB = 1024;
+ static final int MB = KB * KB;
+ static final int DEFAULT_BLOCK_SIZE = 4 * MB;
+ static final int DEFAULT_EXTENT_SIZE = 256 * MB;
+ static final int DEFAULT_TIMEOUT_IN_SECONDS = 120;
+ static final int
+ ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT =
+ 8 * MB;
+
+ private ADLConfKeys() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
new file mode 100644
index 0000000..350c6e7
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/BufferManager.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+/**
+ * Responsible for holding buffered data in the process. Hold only 1 and only
+ * 1 buffer block in the memory. Buffer block
+ * information is for the given file and the offset from the which the block
+ * is fetched. Across the webhdfs instances if
+ * same buffer block has been used then backend trip is avoided. Buffer block
+ * is certainly important since ADL fetches
+ * large amount of data (Default is 4MB however can be configured through
+ * core-site.xml) from the backend.
+ * Observation is in case of ORC/Avro kind of compressed file, buffer block
+ * does not avoid few backend calls across
+ * webhdfs
+ * instances.
+ */
+final class BufferManager {
+ private static final BufferManager BUFFER_MANAGER_INSTANCE = new
+ BufferManager();
+ private static Object lock = new Object();
+ private Buffer buffer = null;
+ private String fileName;
+
+ /**
+ * Constructor.
+ */
+ private BufferManager() {
+ }
+
+ public static Object getLock() {
+ return lock;
+ }
+
+ public static BufferManager getInstance() {
+ return BUFFER_MANAGER_INSTANCE;
+ }
+
+ /**
+ * Validate if the current buffer block is of given stream.
+ *
+ * @param path ADL stream path
+ * @param offset Stream offset that caller is interested in
+ * @return True if the buffer block is available otherwise false
+ */
+ boolean hasValidDataForOffset(String path, long offset) {
+ if (this.fileName == null) {
+ return false;
+ }
+
+ if (!this.fileName.equals(path)) {
+ return false;
+ }
+
+ if (buffer == null) {
+ return false;
+ }
+
+ if ((offset < buffer.offset) || (offset >= (buffer.offset
+ + buffer.data.length))) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Clean buffer block.
+ */
+ void clear() {
+ buffer = null;
+ }
+
+ /**
+ * Validate if the current buffer block is of given stream. For now partial
+ * data available is not supported.
+ * Data must be available exactly or within the range of offset and size
+ * passed as parameter.
+ *
+ * @param path Stream path
+ * @param offset Offset of the stream
+ * @param size Size of the data from the offset of the stream caller
+ * interested in
+ * @return True if the data is available from the given offset and of the
+ * size caller is interested in.
+ */
+ boolean hasData(String path, long offset, int size) {
+
+ if (!hasValidDataForOffset(path, offset)) {
+ return false;
+ }
+
+ if ((size + offset) > (buffer.data.length + buffer.offset)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Return the buffer block from the requested offset. It is caller
+ * responsibility to check if the buffer block is
+ * of there interest and offset is valid.
+ *
+ * @param data Byte array to be filed from the buffer block
+ * @param offset Data to be fetched from the offset.
+ */
+ void get(byte[] data, long offset) {
+ System.arraycopy(buffer.data, (int) (offset - buffer.offset), data, 0,
+ data.length);
+ }
+
+ /**
+ * Create new empty buffer block of the given size.
+ *
+ * @param len Size of the buffer block.
+ * @return Empty byte array.
+ */
+ byte[] getEmpty(int len) {
+ return new byte[len];
+ }
+
+ /**
+ * This function allows caller to specify new buffer block for the stream
+ * which is pulled from the backend.
+ *
+ * @param data Buffer
+ * @param path Stream path to which buffer belongs to
+ * @param offset Stream offset where buffer start with
+ */
+ void add(byte[] data, String path, long offset) {
+ if (data == null) {
+ return;
+ }
+
+ buffer = new Buffer();
+ buffer.data = data;
+ buffer.offset = offset;
+ this.fileName = path;
+ }
+
+ /**
+ * @return Size of the buffer.
+ */
+ int getBufferSize() {
+ return buffer.data.length;
+ }
+
+ /**
+ * @return Stream offset where buffer start with
+ */
+ long getBufferOffset() {
+ return buffer.offset;
+ }
+
+ /**
+ * Buffer container.
+ */
+ static class Buffer {
+ private byte[] data;
+ private long offset;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
new file mode 100644
index 0000000..89011d2
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/PrivateAzureDataLakeFileSystem.java
@@ -0,0 +1,1108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.web.resources.ADLFlush;
+import org.apache.hadoop.hdfs.web.resources.ADLGetOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPostOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLPutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ADLVersionInfo;
+import org.apache.hadoop.hdfs.web.resources.AppendADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.BlockSizeParam;
+import org.apache.hadoop.hdfs.web.resources.BufferSizeParam;
+import org.apache.hadoop.hdfs.web.resources.CreateADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.CreateFlagParam;
+import org.apache.hadoop.hdfs.web.resources.CreateParentParam;
+import org.apache.hadoop.hdfs.web.resources.GetOpParam;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
+import org.apache.hadoop.hdfs.web.resources.LeaseParam;
+import org.apache.hadoop.hdfs.web.resources.LengthParam;
+import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
+import org.apache.hadoop.hdfs.web.resources.Param;
+import org.apache.hadoop.hdfs.web.resources.PermissionParam;
+import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.hdfs.web.resources.ReadADLNoRedirectParam;
+import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.SocketException;
+import java.net.URI;
+import java.net.URL;
+import java.util.EnumSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Extended @see SWebHdfsFileSystem API. This class contains Azure data lake
+ * specific stability, Reliability and performance improvement.
+ * <p>
+ * Motivation behind PrivateAzureDataLakeFileSystem to encapsulate dependent
+ * implementation on org.apache.hadoop.hdfs.web package to configure query
+ * parameters, configuration over HTTP request send to backend .. etc. This
+ * class should be refactored and moved under package org.apache.hadoop.fs
+ * .adl once the required dependent changes are made into ASF code.
+ */
+public class PrivateAzureDataLakeFileSystem extends SWebHdfsFileSystem {
+
+ public static final String SCHEME = "adl";
+
+ // Feature configuration
+ private boolean featureGetBlockLocationLocallyBundled = true;
+ private boolean featureConcurrentReadWithReadAhead = true;
+ private boolean featureRedirectOff = true;
+ private boolean featureFlushWhenEOF = true;
+ private boolean overrideOwner = false;
+ private int maxConcurrentConnection;
+ private int maxBufferSize;
+ private String userName;
+
+ /**
+ * Constructor.
+ */
+ public PrivateAzureDataLakeFileSystem() {
+ try {
+ userName = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch (IOException e) {
+ userName = "hadoop";
+ }
+ }
+
+ @Override
+ public synchronized void initialize(URI uri, Configuration conf)
+ throws IOException {
+ super.initialize(uri, conf);
+ overrideOwner = getConf()
+ .getBoolean(ADLConfKeys.ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
+ ADLConfKeys.ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
+
+ featureRedirectOff = getConf()
+ .getBoolean(ADLConfKeys.ADL_FEATURE_REDIRECT_OFF,
+ ADLConfKeys.ADL_FEATURE_REDIRECT_OFF_DEFAULT);
+
+ featureGetBlockLocationLocallyBundled = getConf()
+ .getBoolean(ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED,
+ ADLConfKeys.ADL_FEATURE_GET_BLOCK_LOCATION_LOCALLY_BUNDLED_DEFAULT);
+
+ featureConcurrentReadWithReadAhead = getConf().
+ getBoolean(ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD,
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_DEFAULT);
+
+ maxBufferSize = getConf().getInt(
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE,
+ ADLConfKeys
+ .ADL_FEATURE_CONCURRENT_READ_WITH_READ_AHEAD_BUFFER_SIZE_DEFAULT);
+
+ maxConcurrentConnection = getConf().getInt(
+ ADLConfKeys.ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN,
+ ADLConfKeys
+ .ADL_FEATURE_CONCURRENT_READ_AHEAD_MAX_CONCURRENT_CONN_DEFAULT);
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureGetBlockLocationLocallyBundled() {
+ return featureGetBlockLocationLocallyBundled;
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureConcurrentReadWithReadAhead() {
+ return featureConcurrentReadWithReadAhead;
+ }
+
+ @VisibleForTesting
+ protected boolean isFeatureRedirectOff() {
+ return featureRedirectOff;
+ }
+
+ @VisibleForTesting
+ protected boolean isOverrideOwnerFeatureOn() {
+ return overrideOwner;
+ }
+
+ @VisibleForTesting
+ protected int getMaxBufferSize() {
+ return maxBufferSize;
+ }
+
+ @VisibleForTesting
+ protected int getMaxConcurrentConnection() {
+ return maxConcurrentConnection;
+ }
+
+ @Override
+ public String getScheme() {
+ return SCHEME;
+ }
+
+ /**
+ * Constructing home directory locally is fine as long as Hadoop
+ * local user name and ADL user name relationship story is not fully baked
+ * yet.
+ *
+ * @return Hadoop local user home directory.
+ */
+ @Override
+ public final Path getHomeDirectory() {
+ try {
+ return makeQualified(new Path(
+ "/user/" + UserGroupInformation.getCurrentUser().getShortUserName()));
+ } catch (IOException e) {
+ }
+
+ return new Path("/user/" + userName);
+ }
+
+ /**
+ * Azure data lake does not support user configuration for data replication
+ * hence not leaving system to query on
+ * azure data lake.
+ *
+ * Stub implementation
+ *
+ * @param p Not honoured
+ * @param replication Not honoured
+ * @return True hard coded since ADL file system does not support
+ * replication configuration
+ * @throws IOException No exception would not thrown in this case however
+ * aligning with parent api definition.
+ */
+ @Override
+ public final boolean setReplication(final Path p, final short replication)
+ throws IOException {
+ return true;
+ }
+
+ /**
+ * @param f File/Folder path
+ * @return FileStatus instance containing metadata information of f
+ * @throws IOException For any system error
+ */
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ FileStatus status = super.getFileStatus(f);
+
+ if (overrideOwner) {
+ FileStatus proxiedStatus = new FileStatus(status.getLen(),
+ status.isDirectory(), status.getReplication(), status.getBlockSize(),
+ status.getModificationTime(), status.getAccessTime(),
+ status.getPermission(), userName, "hdfs", status.getPath());
+ return proxiedStatus;
+ } else {
+ return status;
+ }
+ }
+
+ /**
+ * Create call semantic is handled differently in case of ADL. Create
+ * semantics is translated to Create/Append
+ * semantics.
+ * 1. No dedicated connection to server.
+ * 2. Buffering is locally done, Once buffer is full or flush is invoked on
+ * the by the caller. All the pending
+ * data is pushed to ADL as APPEND operation code.
+ * 3. On close - Additional call is send to server to close the stream, and
+ * release lock from the stream.
+ *
+ * Necessity of Create/Append semantics is
+ * 1. ADL backend server does not allow idle connection for longer duration
+ * . In case of slow writer scenario,
+ * observed connection timeout/Connection reset causing occasional job
+ * failures.
+ * 2. Performance boost to jobs which are slow writer, avoided network latency
+ * 3. ADL equally better performing with multiple of 4MB chunk as append
+ * calls.
+ *
+ * @param f File path
+ * @param permission Access permission for the newly created file
+ * @param overwrite Remove existing file and recreate new one if true
+ * otherwise throw error if file exist
+ * @param bufferSize Buffer size, ADL backend does not honour
+ * @param replication Replication count, ADL backend does not honour
+ * @param blockSize Block size, ADL backend does not honour
+ * @param progress Progress indicator
+ * @return FSDataOutputStream OutputStream on which application can push
+ * stream of bytes
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission,
+ final boolean overwrite, final int bufferSize, final short replication,
+ final long blockSize, final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+ new PermissionParam(applyUMask(permission)),
+ new OverwriteParam(overwrite), new BufferSizeParam(bufferSize),
+ new ReplicationParam(replication), new BlockSizeParam(blockSize),
+ new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+ };
+ }
+
+ @Override
+ public FSDataOutputStream createNonRecursive(final Path f,
+ final FsPermission permission, final EnumSet<CreateFlag> flag,
+ final int bufferSize, final short replication, final long blockSize,
+ final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ String leaseId = java.util.UUID.randomUUID().toString();
+ return new FSDataOutputStream(new BatchAppendOutputStream(f, bufferSize,
+ new PermissionParam(applyUMask(permission)), new CreateFlagParam(flag),
+ new CreateParentParam(false), new BufferSizeParam(bufferSize),
+ new ReplicationParam(replication), new LeaseParam(leaseId),
+ new BlockSizeParam(blockSize),
+ new ADLVersionInfo(VersionInfo.getVersion())), statistics) {
+ };
+ }
+
+ /**
+ * Since defined as private in parent class, redefined to pass through
+ * Create api implementation.
+ *
+ * @param permission
+ * @return FsPermission list
+ */
+ private FsPermission applyUMask(FsPermission permission) {
+ FsPermission fsPermission = permission;
+ if (fsPermission == null) {
+ fsPermission = FsPermission.getDefault();
+ }
+ return fsPermission.applyUMask(FsPermission.getUMask(getConf()));
+ }
+
+ /**
+ * Open call semantic is handled differently in case of ADL. Instead of
+ * network stream is returned to the user,
+ * Overridden FsInputStream is returned.
+ *
+ * 1. No dedicated connection to server.
+ * 2. Process level concurrent read ahead Buffering is done, This allows
+ * data to be available for caller quickly.
+ * 3. Number of byte to read ahead is configurable.
+ *
+ * Advantage of Process level concurrent read ahead Buffering semantics is
+ * 1. ADL backend server does not allow idle connection for longer duration
+ * . In case of slow reader scenario,
+ * observed connection timeout/Connection reset causing occasional job
+ * failures.
+ * 2. Performance boost to jobs which are slow reader, avoided network latency
+ * 3. Compressed format support like ORC, and large data files gains the
+ * most out of this implementation.
+ *
+ * Read ahead feature is configurable.
+ *
+ * @param f File path
+ * @param buffersize Buffer size
+ * @return FSDataInputStream InputStream on which application can read
+ * stream of bytes
+ * @throws IOException when system error, internal server error or user error
+ */
+ @Override
+ public FSDataInputStream open(final Path f, final int buffersize)
+ throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+ // use a runner so the open can recover from an invalid token
+ FsPathConnectionRunner runner = null;
+
+ if (featureConcurrentReadWithReadAhead) {
+ URL url = this.toUrl(op, f, new BufferSizeParam(buffersize),
+ new ReadADLNoRedirectParam(true),
+ new ADLVersionInfo(VersionInfo.getVersion()));
+
+ BatchByteArrayInputStream bb = new BatchByteArrayInputStream(url, f,
+ maxBufferSize, maxConcurrentConnection);
+
+ FSDataInputStream fin = new FSDataInputStream(bb);
+ return fin;
+ } else {
+ if (featureRedirectOff) {
+ runner = new FsPathConnectionRunner(ADLGetOpParam.Op.OPEN, f,
+ new BufferSizeParam(buffersize), new ReadADLNoRedirectParam(true),
+ new ADLVersionInfo(VersionInfo.getVersion()));
+ } else {
+ runner = new FsPathConnectionRunner(op, f,
+ new BufferSizeParam(buffersize));
+ }
+
+ return new FSDataInputStream(
+ new OffsetUrlInputStream(new UnresolvedUrlOpener(runner),
+ new OffsetUrlOpener(null)));
+ }
+ }
+
+ /**
+ * @param f File/Folder path
+ * @return FileStatus array list
+ * @throws IOException For system error
+ */
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ FileStatus[] fileStatuses = super.listStatus(f);
+ for (int i = 0; i < fileStatuses.length; i++) {
+ if (overrideOwner) {
+ fileStatuses[i] = new FileStatus(fileStatuses[i].getLen(),
+ fileStatuses[i].isDirectory(), fileStatuses[i].getReplication(),
+ fileStatuses[i].getBlockSize(),
+ fileStatuses[i].getModificationTime(),
+ fileStatuses[i].getAccessTime(), fileStatuses[i].getPermission(),
+ userName, "hdfs", fileStatuses[i].getPath());
+ }
+ }
+ return fileStatuses;
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final FileStatus status,
+ final long offset, final long length) throws IOException {
+ if (status == null) {
+ return null;
+ }
+
+ if (featureGetBlockLocationLocallyBundled) {
+ if ((offset < 0) || (length < 0)) {
+ throw new IllegalArgumentException("Invalid start or len parameter");
+ }
+
+ if (status.getLen() < offset) {
+ return new BlockLocation[0];
+ }
+
+ final String[] name = {"localhost"};
+ final String[] host = {"localhost"};
+ long blockSize = ADLConfKeys.DEFAULT_EXTENT_SIZE; // Block size must be
+ // non zero
+ int numberOfLocations =
+ (int) (length / blockSize) + ((length % blockSize == 0) ? 0 : 1);
+ BlockLocation[] locations = new BlockLocation[numberOfLocations];
+ for (int i = 0; i < locations.length; i++) {
+ long currentOffset = offset + (i * blockSize);
+ long currentLength = Math
+ .min(blockSize, offset + length - currentOffset);
+ locations[i] = new BlockLocation(name, host, currentOffset,
+ currentLength);
+ }
+
+ return locations;
+ } else {
+ return getFileBlockLocations(status.getPath(), offset, length);
+ }
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
+ final long length) throws IOException {
+ statistics.incrementReadOps(1);
+
+ if (featureGetBlockLocationLocallyBundled) {
+ FileStatus fileStatus = getFileStatus(p);
+ return getFileBlockLocations(fileStatus, offset, length);
+ } else {
+ return super.getFileBlockLocations(p, offset, length);
+ }
+ }
+
+ enum StreamState {
+ Initial,
+ DataCachedInLocalBuffer,
+ StreamEnd
+ }
+
+ class BatchAppendOutputStream extends OutputStream {
+ private Path fsPath;
+ private Param<?, ?>[] parameters;
+ private byte[] data = null;
+ private int offset = 0;
+ private long length = 0;
+ private boolean eof = false;
+ private boolean hadError = false;
+ private byte[] dataBuffers = null;
+ private int bufSize = 0;
+ private boolean streamClosed = false;
+
+ public BatchAppendOutputStream(Path path, int bufferSize,
+ Param<?, ?>... param) throws IOException {
+ if (bufferSize < (ADLConfKeys.DEFAULT_BLOCK_SIZE)) {
+ bufSize = ADLConfKeys.DEFAULT_BLOCK_SIZE;
+ } else {
+ bufSize = bufferSize;
+ }
+
+ this.fsPath = path;
+ this.parameters = param;
+ this.data = getBuffer();
+ FSDataOutputStream createStream = null;
+ try {
+ if (featureRedirectOff) {
+ CreateADLNoRedirectParam skipRedirect = new CreateADLNoRedirectParam(
+ true);
+ Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+ new Param<?, ?>[param.length + 2] :
+ new Param<?, ?>[param.length + 1];
+ System.arraycopy(param, 0, tmpParam, 0, param.length);
+ tmpParam[param.length] = skipRedirect;
+ if (featureFlushWhenEOF) {
+ tmpParam[param.length + 1] = new ADLFlush(false);
+ }
+ createStream = new FsPathOutputStreamRunner(ADLPutOpParam.Op.CREATE,
+ fsPath, 1, tmpParam).run();
+ } else {
+ createStream = new FsPathOutputStreamRunner(PutOpParam.Op.CREATE,
+ fsPath, 1, param).run();
+ }
+ } finally {
+ if (createStream != null) {
+ createStream.close();
+ }
+ }
+ }
+
+ @Override
+ public final synchronized void write(int b) throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ if (offset == (data.length)) {
+ flush();
+ }
+
+ data[offset] = (byte) b;
+ offset++;
+
+ // Statistics will get incremented again as part of the batch updates,
+ // decrement here to avoid double value
+ if (statistics != null) {
+ statistics.incrementBytesWritten(-1);
+ }
+ }
+
+ @Override
+ public final synchronized void write(byte[] buf, int off, int len)
+ throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ int bytesToWrite = len;
+ int localOff = off;
+ int localLen = len;
+ if (localLen >= data.length) {
+ // Flush data that is already in our internal buffer
+ flush();
+
+ // Keep committing data until we have less than our internal buffers
+ // length left
+ do {
+ try {
+ commit(buf, localOff, data.length, eof);
+ } catch (IOException e) {
+ hadError = true;
+ throw e;
+ }
+ localOff += data.length;
+ localLen -= data.length;
+ } while (localLen >= data.length);
+ }
+
+ // At this point, we have less than data.length left to copy from users
+ // buffer
+ if (offset + localLen >= data.length) {
+ // Users buffer has enough data left to fill our internal buffer
+ int bytesToCopy = data.length - offset;
+ System.arraycopy(buf, localOff, data, offset, bytesToCopy);
+ offset += bytesToCopy;
+
+ // Flush our internal buffer
+ flush();
+ localOff += bytesToCopy;
+ localLen -= bytesToCopy;
+ }
+
+ if (localLen > 0) {
+ // Simply copy the remainder from the users buffer into our internal
+ // buffer
+ System.arraycopy(buf, localOff, data, offset, localLen);
+ offset += localLen;
+ }
+
+ // Statistics will get incremented again as part of the batch updates,
+ // decrement here to avoid double value
+ if (statistics != null) {
+ statistics.incrementBytesWritten(-bytesToWrite);
+ }
+ }
+
+ @Override
+ public final synchronized void flush() throws IOException {
+ if (streamClosed) {
+ throw new IOException(fsPath + " stream object is closed.");
+ }
+
+ if (offset > 0) {
+ try {
+ commit(data, 0, offset, eof);
+ } catch (IOException e) {
+ hadError = true;
+ throw e;
+ }
+ }
+
+ offset = 0;
+ }
+
+ @Override
+ public final synchronized void close() throws IOException {
+ // Stream is closed earlier, return quietly.
+ if(streamClosed) {
+ return;
+ }
+
+ if (featureRedirectOff) {
+ eof = true;
+ }
+
+ boolean flushedSomething = false;
+ if (hadError) {
+ // No point proceeding further since the error has occurred and
+ // stream would be required to upload again.
+ streamClosed = true;
+ return;
+ } else {
+ flushedSomething = offset > 0;
+ try {
+ flush();
+ } finally {
+ streamClosed = true;
+ }
+ }
+
+ if (featureRedirectOff) {
+ // If we didn't flush anything from our internal buffer, we have to
+ // call the service again
+ // with an empty payload and flush=true in the url
+ if (!flushedSomething) {
+ try {
+ commit(null, 0, ADLConfKeys.KB, true);
+ } finally {
+ streamClosed = true;
+ }
+ }
+ }
+ }
+
+ private void commit(byte[] buffer, int off, int len, boolean endOfFile)
+ throws IOException {
+ OutputStream out = null;
+ try {
+ if (featureRedirectOff) {
+ AppendADLNoRedirectParam skipRedirect = new AppendADLNoRedirectParam(
+ true);
+ Param<?, ?>[] tmpParam = featureFlushWhenEOF ?
+ new Param<?, ?>[parameters.length + 3] :
+ new Param<?, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = skipRedirect;
+ if (featureFlushWhenEOF) {
+ tmpParam[parameters.length + 1] = new ADLFlush(endOfFile);
+ tmpParam[parameters.length + 2] = new OffsetParam(length);
+ }
+
+ out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+ len, tmpParam).run();
+ } else {
+ out = new FsPathOutputStreamRunner(ADLPostOpParam.Op.APPEND, fsPath,
+ len, parameters).run();
+ }
+
+ if (buffer != null) {
+ out.write(buffer, off, len);
+ length += len;
+ }
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ }
+
+ private byte[] getBuffer() {
+ // Switch between the first and second buffer
+ dataBuffers = new byte[bufSize];
+ return dataBuffers;
+ }
+ }
+
+ /**
+ * Read data from backend in chunks instead of persistent connection. This
+ * is to avoid slow reader causing socket
+ * timeout.
+ */
+ protected class BatchByteArrayInputStream extends FSInputStream {
+
+ private static final int SIZE4MB = 4 * 1024 * 1024;
+ private final URL runner;
+ private byte[] data = null;
+ private long validDataHoldingSize = 0;
+ private int bufferOffset = 0;
+ private long currentFileOffset = 0;
+ private long nextFileOffset = 0;
+ private long fileSize = 0;
+ private StreamState state = StreamState.Initial;
+ private int maxBufferSize;
+ private int maxConcurrentConnection;
+ private Path fsPath;
+ private boolean streamIsClosed;
+ private Future[] subtasks = null;
+
+ BatchByteArrayInputStream(URL url, Path p, int bufferSize,
+ int concurrentConnection) throws IOException {
+ this.runner = url;
+ fsPath = p;
+ FileStatus fStatus = getFileStatus(fsPath);
+ if (!fStatus.isFile()) {
+ throw new IOException("Cannot open the directory " + p + " for " +
+ "reading");
+ }
+ fileSize = fStatus.getLen();
+ this.maxBufferSize = bufferSize;
+ this.maxConcurrentConnection = concurrentConnection;
+ this.streamIsClosed = false;
+ }
+
+ @Override
+ public synchronized final int read(long position, byte[] buffer, int offset,
+ int length) throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ long oldPos = this.getPos();
+
+ int nread1;
+ try {
+ this.seek(position);
+ nread1 = this.read(buffer, offset, length);
+ } finally {
+ this.seek(oldPos);
+ }
+
+ return nread1;
+ }
+
+ @Override
+ public synchronized final int read() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ int status = doBufferAvailabilityCheck();
+ if (status == -1) {
+ return status;
+ }
+ int ch = data[bufferOffset++] & (0xff);
+ if (statistics != null) {
+ statistics.incrementBytesRead(1);
+ }
+ return ch;
+ }
+
+ @Override
+ public synchronized final void readFully(long position, byte[] buffer,
+ int offset, int length) throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+
+ super.readFully(position, buffer, offset, length);
+ if (statistics != null) {
+ statistics.incrementBytesRead(length);
+ }
+ }
+
+ @Override
+ public synchronized final int read(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new IllegalArgumentException();
+ } else if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ } else if (len == 0) {
+ return 0;
+ }
+
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ int status = doBufferAvailabilityCheck();
+ if (status == -1) {
+ return status;
+ }
+
+ int byteRead = 0;
+ long availableBytes = validDataHoldingSize - off;
+ long requestedBytes = bufferOffset + len - off;
+ if (requestedBytes <= availableBytes) {
+ System.arraycopy(data, bufferOffset, b, off, len);
+ bufferOffset += len;
+ byteRead = len;
+ } else {
+ byteRead = super.read(b, off, len);
+ }
+
+ if (statistics != null) {
+ statistics.incrementBytesRead(byteRead);
+ }
+
+ return byteRead;
+ }
+
+ private int doBufferAvailabilityCheck() throws IOException {
+ if (state == StreamState.Initial) {
+ validDataHoldingSize = fill(nextFileOffset);
+ }
+
+ long dataReloadSize = 0;
+ switch ((int) validDataHoldingSize) {
+ case -1:
+ state = StreamState.StreamEnd;
+ return -1;
+ case 0:
+ dataReloadSize = fill(nextFileOffset);
+ if (dataReloadSize <= 0) {
+ state = StreamState.StreamEnd;
+ return (int) dataReloadSize;
+ } else {
+ validDataHoldingSize = dataReloadSize;
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (bufferOffset >= validDataHoldingSize) {
+ dataReloadSize = fill(nextFileOffset);
+ }
+
+ if (bufferOffset >= ((dataReloadSize == 0) ?
+ validDataHoldingSize :
+ dataReloadSize)) {
+ state = StreamState.StreamEnd;
+ return -1;
+ }
+
+ validDataHoldingSize = ((dataReloadSize == 0) ?
+ validDataHoldingSize :
+ dataReloadSize);
+ state = StreamState.DataCachedInLocalBuffer;
+ return 0;
+ }
+
+ private long fill(final long off) throws IOException {
+ if (state == StreamState.StreamEnd) {
+ return -1;
+ }
+
+ if (fileSize <= off) {
+ state = StreamState.StreamEnd;
+ return -1;
+ }
+ int len = maxBufferSize;
+ long fileOffset = 0;
+ boolean isEntireFileCached = true;
+ if ((fileSize <= maxBufferSize)) {
+ len = (int) fileSize;
+ currentFileOffset = 0;
+ nextFileOffset = 0;
+ } else {
+ if (len > (fileSize - off)) {
+ len = (int) (fileSize - off);
+ }
+
+ synchronized (BufferManager.getLock()) {
+ if (BufferManager.getInstance()
+ .hasValidDataForOffset(fsPath.toString(), off)) {
+ len = (int) (
+ BufferManager.getInstance().getBufferOffset() + BufferManager
+ .getInstance().getBufferSize() - (int) off);
+ }
+ }
+
+ if (len <= 0) {
+ len = maxBufferSize;
+ }
+ fileOffset = off;
+ isEntireFileCached = false;
+ }
+
+ data = null;
+ BufferManager bm = BufferManager.getInstance();
+ data = bm.getEmpty(len);
+ boolean fetchDataOverNetwork = false;
+ synchronized (BufferManager.getLock()) {
+ if (bm.hasData(fsPath.toString(), fileOffset, len)) {
+ try {
+ bm.get(data, fileOffset);
+ validDataHoldingSize = data.length;
+ currentFileOffset = fileOffset;
+ } catch (ArrayIndexOutOfBoundsException e) {
+ fetchDataOverNetwork = true;
+ }
+ } else {
+ fetchDataOverNetwork = true;
+ }
+ }
+
+ if (fetchDataOverNetwork) {
+ int splitSize = getSplitSize(len);
+ try {
+ validDataHoldingSize = fillDataConcurrently(data, len, fileOffset,
+ splitSize);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException("Interrupted filling buffer", e);
+ }
+
+ synchronized (BufferManager.getLock()) {
+ bm.add(data, fsPath.toString(), fileOffset);
+ }
+ currentFileOffset = nextFileOffset;
+ }
+
+ nextFileOffset += validDataHoldingSize;
+ state = StreamState.DataCachedInLocalBuffer;
+ bufferOffset = isEntireFileCached ? (int) off : 0;
+ return validDataHoldingSize;
+ }
+
+ int getSplitSize(int size) {
+ if (size <= SIZE4MB) {
+ return 1;
+ }
+
+ // Not practical
+ if (size > maxBufferSize) {
+ size = maxBufferSize;
+ }
+
+ int equalBufferSplit = Math.max(Math.round(size / SIZE4MB), 1);
+ int splitSize = Math.min(equalBufferSplit, maxConcurrentConnection);
+ return splitSize;
+ }
+
+ @Override
+ public synchronized final void seek(long pos) throws IOException {
+ if (pos == -1) {
+ throw new IOException("Bad offset, cannot seek to " + pos);
+ }
+
+ BufferManager bm = BufferManager.getInstance();
+ synchronized (BufferManager.getLock()) {
+ if (bm.hasValidDataForOffset(fsPath.toString(), pos)) {
+ state = StreamState.DataCachedInLocalBuffer;
+ } else if (pos >= 0) {
+ state = StreamState.Initial;
+ }
+ }
+
+ long availableBytes = (currentFileOffset + validDataHoldingSize);
+
+ // Check if this position falls under buffered data
+ if (pos < currentFileOffset || availableBytes <= 0) {
+ validDataHoldingSize = 0;
+ currentFileOffset = pos;
+ nextFileOffset = pos;
+ bufferOffset = 0;
+ return;
+ }
+
+ if (pos < availableBytes && pos >= currentFileOffset) {
+ state = StreamState.DataCachedInLocalBuffer;
+ bufferOffset = (int) (pos - currentFileOffset);
+ } else {
+ validDataHoldingSize = 0;
+ currentFileOffset = pos;
+ nextFileOffset = pos;
+ bufferOffset = 0;
+ }
+ }
+
+ @Override
+ public synchronized final long getPos() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ return bufferOffset + currentFileOffset;
+ }
+
+ @Override
+ public synchronized final int available() throws IOException {
+ if (streamIsClosed) {
+ throw new IOException("Stream already closed");
+ }
+ return Integer.MAX_VALUE;
+ }
+
+ @Override
+ public final boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private int fillDataConcurrently(byte[] byteArray, int length,
+ long globalOffset, int splitSize)
+ throws IOException, InterruptedException {
+ ExecutorService executor = Executors.newFixedThreadPool(splitSize);
+ subtasks = new Future[splitSize];
+ for (int i = 0; i < splitSize; i++) {
+ int offset = i * (length / splitSize);
+ int splitLength = (splitSize == (i + 1)) ?
+ (length / splitSize) + (length % splitSize) :
+ (length / splitSize);
+ subtasks[i] = executor.submit(
+ new BackgroundReadThread(byteArray, offset, splitLength,
+ globalOffset + offset));
+ }
+
+ executor.shutdown();
+ // wait until all tasks are finished
+ executor.awaitTermination(ADLConfKeys.DEFAULT_TIMEOUT_IN_SECONDS,
+ TimeUnit.SECONDS);
+
+ int totalBytePainted = 0;
+ for (int i = 0; i < splitSize; ++i) {
+ try {
+ totalBytePainted += (Integer) subtasks[i].get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e.getCause());
+ } catch (ExecutionException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(e.getCause());
+ }
+ }
+
+ if (totalBytePainted != length) {
+ throw new IOException("Expected " + length + " bytes, Got " +
+ totalBytePainted + " bytes");
+ }
+
+ return totalBytePainted;
+ }
+
+ @Override
+ public synchronized final void close() throws IOException {
+ synchronized (BufferManager.getLock()) {
+ BufferManager.getInstance().clear();
+ }
+ //need to cleanup the above code the stream and connection close doesn't
+ // happen here
+ //flag set to mark close happened, cannot use the stream once closed
+ streamIsClosed = true;
+ }
+
+ /**
+ * Reads data from the ADL backend from the specified global offset and
+ * given
+ * length. Read data from ADL backend is copied to buffer array from the
+ * offset value specified.
+ *
+ * @param buffer Store read data from ADL backend in the buffer.
+ * @param offset Store read data from ADL backend in the buffer
+ * from the
+ * offset.
+ * @param length Size of the data read from the ADL backend.
+ * @param globalOffset Read data from file offset.
+ * @return Number of bytes read from the ADL backend
+ * @throws IOException For any intermittent server issues or internal
+ * failures.
+ */
+ private int fillUpData(byte[] buffer, int offset, int length,
+ long globalOffset) throws IOException {
+ int totalBytesRead = 0;
+ final URL offsetUrl = new URL(
+ runner + "&" + new OffsetParam(String.valueOf(globalOffset)) + "&"
+ + new LengthParam(String.valueOf(length)));
+ HttpURLConnection conn = new URLRunner(GetOpParam.Op.OPEN, offsetUrl,
+ true).run();
+ InputStream in = conn.getInputStream();
+ try {
+ int bytesRead = 0;
+ while ((bytesRead = in.read(buffer, (int) offset + totalBytesRead,
+ (int) (length - totalBytesRead))) > 0) {
+ totalBytesRead += bytesRead;
+ }
+
+ // InputStream must be fully consumed to enable http keep-alive
+ if (bytesRead == 0) {
+ // Looking for EOF marker byte needs to be read.
+ if (in.read() != -1) {
+ throw new SocketException(
+ "Server returned more than requested data.");
+ }
+ }
+ } finally {
+ in.close();
+ conn.disconnect();
+ }
+
+ return totalBytesRead;
+ }
+
+ private class BackgroundReadThread implements Callable {
+
+ private final byte[] data;
+ private int offset;
+ private int length;
+ private long globalOffset;
+
+ BackgroundReadThread(byte[] buffer, int off, int size, long position) {
+ this.data = buffer;
+ this.offset = off;
+ this.length = size;
+ this.globalOffset = position;
+ }
+
+ public Object call() throws IOException {
+ return fillUpData(data, offset, length, globalOffset);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
new file mode 100644
index 0000000..d7dce25
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/PrivateCachedRefreshTokenBasedAccessTokenProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.oauth2;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Exposing AccessTokenProvider publicly to extend in com.microsoft.azure
+ * .datalake package. Extended version to cache
+ * token for the process to gain performance gain.
+ */
+@Private
+@Unstable
+public abstract class PrivateCachedRefreshTokenBasedAccessTokenProvider
+ extends AccessTokenProvider {
+
+ // visibility workaround
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
new file mode 100644
index 0000000..7a9dffa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/oauth2/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link
+ * org.apache.hadoop.hdfs.web.oauth2} for oauth2 token management support.
+ */
+package org.apache.hadoop.hdfs.web.oauth2;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
new file mode 100644
index 0000000..1cc8273
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * A distributed implementation of {@link org.apache.hadoop.hdfs.web} for
+ * reading and writing files on Azure data lake file system. This
+ * implementation is derivation from the webhdfs specification.
+ */
+package org.apache.hadoop.hdfs.web;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
new file mode 100644
index 0000000..b76aaaa
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLFlush.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+/**
+ * Query parameter to notify backend server that the all the data has been
+ * pushed to over the stream.
+ *
+ * Used in operation code Create and Append.
+ */
+public class ADLFlush extends BooleanParam {
+ /**
+ * Parameter name.
+ */
+ public static final String NAME = "flush";
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ *
+ * @param value the parameter value.
+ */
+ public ADLFlush(final Boolean value) {
+ super(DOMAIN, value);
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9581fb71/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
new file mode 100644
index 0000000..6b3708f
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/main/java/org/apache/hadoop/hdfs/web/resources/ADLGetOpParam.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdfs.web.resources;
+
+import java.net.HttpURLConnection;
+
+/**
+ * Extended Webhdfs GetOpParam to avoid redirect operation for azure data
+ * lake storage.
+ */
+public class ADLGetOpParam extends HttpOpParam<ADLGetOpParam.Op> {
+ private static final Domain<Op> DOMAIN = new Domain<Op>(NAME, Op.class);
+
+ /**
+ * Constructor.
+ *
+ * @param str a string representation of the parameter value.
+ */
+ public ADLGetOpParam(final String str) {
+ super(DOMAIN, DOMAIN.parse(str));
+ }
+
+ @Override
+ public final String getName() {
+ return NAME;
+ }
+
+ /**
+ * Get operations.
+ */
+ public static enum Op implements HttpOpParam.Op {
+ OPEN(false, HttpURLConnection.HTTP_OK);
+
+ private final boolean redirect;
+ private final int expectedHttpResponseCode;
+ private final boolean requireAuth;
+
+ Op(final boolean doRedirect, final int expectHttpResponseCode) {
+ this(doRedirect, expectHttpResponseCode, false);
+ }
+
+ Op(final boolean doRedirect, final int expectHttpResponseCode,
+ final boolean doRequireAuth) {
+ this.redirect = doRedirect;
+ this.expectedHttpResponseCode = expectHttpResponseCode;
+ this.requireAuth = doRequireAuth;
+ }
+
+ @Override
+ public HttpOpParam.Type getType() {
+ return HttpOpParam.Type.GET;
+ }
+
+ @Override
+ public boolean getRequireAuth() {
+ return requireAuth;
+ }
+
+ @Override
+ public boolean getDoOutput() {
+ return false;
+ }
+
+ @Override
+ public boolean getRedirect() {
+ return redirect;
+ }
+
+ @Override
+ public int getExpectedHttpResponseCode() {
+ return expectedHttpResponseCode;
+ }
+
+ @Override
+ public String toQueryString() {
+ return NAME + "=" + this;
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org