You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:16:11 UTC

[flink] 03/06: [FLINK-10366] [s3] Adjust Hadoop-based s3 connector to use common denominator module

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac97d46ea4586e41ee8606b5c6b6ab6640f6a05a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 13 11:14:31 2018 +0200

    [FLINK-10366] [s3] Adjust Hadoop-based s3 connector to use common denominator module
---
 flink-filesystems/flink-s3-fs-hadoop/pom.xml       |  253 +-
 .../flink/fs/s3hadoop/S3FileSystemFactory.java     |    7 +-
 .../java/org/apache/hadoop/conf/Configuration.java | 3000 --------------------
 .../org/apache/hadoop/util/NativeCodeLoader.java   |   94 -
 .../src/main/resources/META-INF/NOTICE             |   98 -
 .../src/main/resources/core-default-shaded.xml     | 2312 ---------------
 .../fs/s3hadoop/HadoopS3FileSystemITCase.java      |   62 +-
 .../src/test/resources/core-site.xml               | 2312 ---------------
 8 files changed, 41 insertions(+), 8097 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
index fb6bda1..270e78d 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml
+++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml
@@ -32,16 +32,9 @@ under the License.
 
 	<packaging>jar</packaging>
 
-	<properties>
-		<!-- Do not change this without updating the copied Configuration class! -->
-		<s3hadoop.hadoop.version>2.8.1</s3hadoop.hadoop.version>
-		<s3hadoop.aws.version>1.11.95</s3hadoop.aws.version>
-	</properties>
-
 	<dependencies>
 
-		<!-- Flink core -->
-
+		<!-- Flink's file system abstraction (compiled against, not bundled) -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -49,151 +42,32 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-		<!-- File system builds on the Hadoop file system support -->
-
+		<!-- S3 base -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
+			<artifactId>flink-s3-fs-base</artifactId>
 			<version>${project.version}</version>
-
-			<!-- this exclusion is only needed to run tests in the IDE, pre shading,
-				because the optional Hadoop dependency is also pulled in for tests -->
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-shaded-hadoop2</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<!-- Hadoop's S3a file system -->
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-client</artifactId>
-			<version>${s3hadoop.hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-mapreduce-client-core</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-yarn-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-mapreduce-client-app</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>javax.servlet.jsp</groupId>
-					<artifactId>jsp-api</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.directory.server</groupId>
-					<artifactId>apacheds-kerberos-codec</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.curator</groupId>
-					<artifactId>curator-client</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.curator</groupId>
-					<artifactId>curator-framework</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.curator</groupId>
-					<artifactId>curator-recipes</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>org.apache.zookeeper</groupId>
-					<artifactId>zookeeper</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-aws</artifactId>
-			<version>${s3hadoop.hadoop.version}</version>
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.avro</groupId>
-					<artifactId>avro</artifactId>
-				</exclusion>
-				<!-- The aws-java-sdk-core requires jackson 2.6, but
-					hadoop pulls in 2.3 -->
-				<exclusion>
-					<groupId>com.fasterxml.jackson.core</groupId>
-					<artifactId>jackson-annotations</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.fasterxml.jackson.core</groupId>
-					<artifactId>jackson-core</artifactId>
-				</exclusion>
-				<exclusion>
-					<groupId>com.fasterxml.jackson.core</groupId>
-					<artifactId>jackson-databind</artifactId>
-				</exclusion>
-			</exclusions>
-		</dependency>
-
-		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>aws-java-sdk-core</artifactId>
-		</dependency>
-
-		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>aws-java-sdk-kms</artifactId>
 		</dependency>
 
+		<!-- for the behavior test suite -->
 		<dependency>
-			<groupId>com.amazonaws</groupId>
-			<artifactId>aws-java-sdk-s3</artifactId>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
 		</dependency>
 
-		<!-- for the behavior test suite -->
+		<!-- for the settings that make unshaded tests run -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
+			<artifactId>flink-fs-hadoop-shaded</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 			<type>test-jar</type>
 		</dependency>
-	</dependencies>
 
-	<!-- We need to bump the AWS dependencies compared to the ones referenced
-    by Hadoop, because the older versions have bugs affecting this project -->
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>com.amazonaws</groupId>
-				<artifactId>aws-java-sdk-core</artifactId>
-				<version>${s3hadoop.aws.version}</version>
-			</dependency>
-	
-			<dependency>
-				<groupId>com.amazonaws</groupId>
-				<artifactId>aws-java-sdk-kms</artifactId>
-				<version>${s3hadoop.aws.version}</version>
-			</dependency>
-	
-			<dependency>
-				<groupId>com.amazonaws</groupId>
-				<artifactId>aws-java-sdk-s3</artifactId>
-				<version>${s3hadoop.aws.version}</version>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
+	</dependencies>
 
 	<build>
 		<plugins>
@@ -215,113 +89,22 @@ under the License.
 								</includes>
 							</artifactSet>
 							<relocations>
+								<!-- relocate the references to Hadoop to match the shaded Hadoop config -->
 								<relocation>
-									<pattern>com.amazonaws</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.com.amazonaws</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.fasterxml</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.com.fasterxml</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.com.google</shadedPattern>
-									<excludes>
-										<!-- provided -->
-										<exclude>com.google.code.findbugs.**</exclude>
-									</excludes>
-								</relocation>
-								<relocation>
-									<pattern>com.nimbusds</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.com.nimbusds</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>com.squareup</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.com.squareup</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>net.jcip</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.net.jcip</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>net.minidev</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.net.minidev</shadedPattern>
-								</relocation>
-
-								<!-- relocate everything from the flink-hadoop-fs project -->
-								<relocation>
-									<pattern>org.apache.flink.runtime.fs.hdfs</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.fs.hdfs</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.apache.flink.runtime.util</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.apache.flink.runtime.util</shadedPattern>
-									<includes>
-										<include>org.apache.flink.runtime.util.**Hadoop*</include>
-									</includes>
-								</relocation>
-
-								<relocation>
-									<pattern>org.apache</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.apache</shadedPattern>
-									<excludes>
-										<!-- keep all other classes of flink as they are (exceptions above) -->
-										<exclude>org.apache.flink.**</exclude>
-										<exclude>org.apache.log4j.**</exclude> <!-- provided -->
-									</excludes>
-								</relocation>
-								<relocation>
-									<pattern>org.codehaus</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.codehaus</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.joda</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.joda</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.mortbay</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.mortbay</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.tukaani</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.tukaani</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.znerd</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.org.znerd</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>okio</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.okio</shadedPattern>
+									<pattern>org.apache.hadoop</pattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
 								</relocation>
+								<!-- relocate the AWS dependencies -->
 								<relocation>
-									<pattern>software.amazon</pattern>
-									<shadedPattern>org.apache.flink.fs.s3hadoop.shaded.software.amazon</shadedPattern>
+									<pattern>com.amazon</pattern>
+									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>
 								<filter>
 									<artifact>*</artifact>
 									<excludes>
-										<exclude>log4j.properties</exclude>
-										<exclude>mime.types</exclude>
-										<exclude>properties.dtd</exclude>
-										<exclude>PropertyList-1.0.dtd</exclude>
-										<exclude>models/**</exclude>
-										<exclude>mozilla/**</exclude>
-										<exclude>META-INF/maven/com*/**</exclude>
-										<exclude>META-INF/maven/net*/**</exclude>
-										<exclude>META-INF/maven/software*/**</exclude>
-										<exclude>META-INF/maven/joda*/**</exclude>
-										<exclude>META-INF/maven/org.mortbay.jetty/**</exclude>
-										<exclude>META-INF/maven/org.apache.h*/**</exclude>
-										<exclude>META-INF/maven/org.apache.commons/**</exclude>
-										<exclude>META-INF/maven/org.apache.flink/flink-hadoop-fs/**</exclude>
 										<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
-										<!-- we use our own "shaded" core-default.xml: core-default-shaded.xml -->
-										<exclude>core-default.xml</exclude>
-										<!-- we only add a core-site.xml with unshaded classnames for the unit tests -->
-										<exclude>core-site.xml</exclude>
 									</excludes>
 								</filter>
 							</filters>
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
index 4ba49bb..314d34c 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java
@@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.URI;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -37,11 +36,9 @@ import java.util.Set;
 public class S3FileSystemFactory extends AbstractFileSystemFactory {
 	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystemFactory.class);
 
-	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
-		new HashSet<>(Collections.singletonList("com.amazonaws."));
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
 
-	private static final Set<String> CONFIG_KEYS_TO_SHADE =
-		Collections.unmodifiableSet(new HashSet<>(Collections.singleton("fs.s3a.aws.credentials.provider")));
+	private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.singleton("fs.s3a.aws.credentials.provider");
 
 	private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3hadoop.shaded.";
 
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/conf/Configuration.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/conf/Configuration.java
deleted file mode 100644
index 86822de..0000000
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ /dev/null
@@ -1,3000 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.conf;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import org.apache.commons.collections.map.UnmodifiableMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringInterner;
-import org.apache.hadoop.util.StringUtils;
-import org.codehaus.jackson.JsonFactory;
-import org.codehaus.jackson.JsonGenerator;
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
-import org.xml.sax.SAXException;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import java.io.BufferedInputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.JarURLConnection;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.WeakHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-
-/** 
- * Provides access to configuration parameters.
- *
- * <h4 id="Resources">Resources</h4>
- *
- * <p>Configurations are specified by resources. A resource contains a set of
- * name/value pairs as XML data. Each resource is named by either a 
- * <code>String</code> or by a {@link Path}. If named by a <code>String</code>, 
- * then the classpath is examined for a file with that name.  If named by a 
- * <code>Path</code>, then the local filesystem is examined directly, without 
- * referring to the classpath.
- *
- * <p>Unless explicitly turned off, Hadoop by default specifies two 
- * resources, loaded in-order from the classpath: <ol>
- * <li><tt>
- * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default-shaded.xml">
- * core-default-shaded.xml</a></tt>: Read-only defaults for hadoop.</li>
- * <li><tt>core-site.xml</tt>: Site-specific configuration for a given hadoop
- * installation.</li>
- * </ol>
- * Applications may add additional resources, which are loaded
- * subsequent to these resources in the order they are added.
- * 
- * <h4 id="FinalParams">Final Parameters</h4>
- *
- * <p>Configuration parameters may be declared <i>final</i>. 
- * Once a resource declares a value final, no subsequently-loaded 
- * resource can alter that value.  
- * For example, one might define a final parameter with:
- * <tt><pre>
- *  &lt;property&gt;
- *    &lt;name&gt;dfs.hosts.include&lt;/name&gt;
- *    &lt;value&gt;/etc/hadoop/conf/hosts.include&lt;/value&gt;
- *    <b>&lt;final&gt;true&lt;/final&gt;</b>
- *  &lt;/property&gt;</pre></tt>
- *
- * Administrators typically define parameters as final in 
- * <tt>core-site.xml</tt> for values that user applications may not alter.
- *
- * <h4 id="VariableExpansion">Variable Expansion</h4>
- *
- * <p>Value strings are first processed for <i>variable expansion</i>. The
- * available properties are:<ol>
- * <li>Other properties defined in this Configuration; and, if a name is
- * undefined here,</li>
- * <li>Properties in {@link System#getProperties()}.</li>
- * </ol>
- *
- * <p>For example, if a configuration resource contains the following property
- * definitions: 
- * <tt><pre>
- *  &lt;property&gt;
- *    &lt;name&gt;basedir&lt;/name&gt;
- *    &lt;value&gt;/user/${<i>user.name</i>}&lt;/value&gt;
- *  &lt;/property&gt;
- *  
- *  &lt;property&gt;
- *    &lt;name&gt;tempdir&lt;/name&gt;
- *    &lt;value&gt;${<i>basedir</i>}/tmp&lt;/value&gt;
- *  &lt;/property&gt;</pre></tt>
- *
- * When <tt>conf.get("tempdir")</tt> is called, then <tt>${<i>basedir</i>}</tt>
- * will be resolved to another property in this Configuration, while
- * <tt>${<i>user.name</i>}</tt> would then ordinarily be resolved to the value
- * of the System property with that name.
- * <p>When <tt>conf.get("otherdir")</tt> is called, then <tt>${<i>env.BASE_DIR</i>}</tt>
- * will be resolved to the value of the <tt>${<i>BASE_DIR</i>}</tt> environment variable.
- * It supports <tt>${<i>env.NAME:-default</i>}</tt> and <tt>${<i>env.NAME-default</i>}</tt> notations.
- * The former is resolved to "default" if <tt>${<i>NAME</i>}</tt> environment variable is undefined
- * or its value is empty.
- * The latter behaves the same way only if <tt>${<i>NAME</i>}</tt> is undefined.
- * <p>By default, warnings will be given to any deprecated configuration 
- * parameters and these are suppressible by configuring
- * <tt>log4j.logger.org.apache.hadoop.conf.Configuration.deprecation</tt> in
- * log4j.properties file.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class Configuration implements Iterable<Map.Entry<String,String>>,
-                                      Writable {
-  private static final Log LOG =
-    LogFactory.getLog(Configuration.class);
-
-  private static final Log LOG_DEPRECATION =
-    LogFactory.getLog("org.apache.hadoop.conf.Configuration.deprecation");
-
-  private boolean quietmode = true;
-
-  private static final String DEFAULT_STRING_CHECK =
-    "testingforemptydefaultvalue";
-
-  private boolean allowNullValueProperties = false;
-  
-  private static class Resource {
-    private final Object resource;
-    private final String name;
-    
-    public Resource(Object resource) {
-      this(resource, resource.toString());
-    }
-    
-    public Resource(Object resource, String name) {
-      this.resource = resource;
-      this.name = name;
-    }
-    
-    public String getName(){
-      return name;
-    }
-    
-    public Object getResource() {
-      return resource;
-    }
-    
-    @Override
-    public String toString() {
-      return name;
-    }
-  }
-  
-  /**
-   * List of configuration resources.
-   */
-  private ArrayList<Resource> resources = new ArrayList<Resource>();
-  
-  /**
-   * The value reported as the setting resource when a key is set
-   * by code rather than a file resource by dumpConfiguration.
-   */
-  static final String UNKNOWN_RESOURCE = "Unknown";
-
-
-  /**
-   * List of configuration parameters marked <b>final</b>. 
-   */
-  private Set<String> finalParameters = Collections.newSetFromMap(
-      new ConcurrentHashMap<String, Boolean>());
-  
-  private boolean loadDefaults = true;
-  
-  /**
-   * Configuration objects
-   */
-  private static final WeakHashMap<Configuration,Object> REGISTRY = 
-    new WeakHashMap<Configuration,Object>();
-  
-  /**
-   * List of default Resources. Resources are loaded in the order of the list 
-   * entries
-   */
-  private static final CopyOnWriteArrayList<String> defaultResources =
-    new CopyOnWriteArrayList<String>();
-
-  private static final Map<ClassLoader, Map<String, WeakReference<Class<?>>>>
-    CACHE_CLASSES = new WeakHashMap<ClassLoader, Map<String, WeakReference<Class<?>>>>();
-
-  /**
-   * Sentinel value to store negative cache results in {@link #CACHE_CLASSES}.
-   */
-  private static final Class<?> NEGATIVE_CACHE_SENTINEL =
-    NegativeCacheSentinel.class;
-
-  /**
-   * Stores the mapping of key to the resource which modifies or loads 
-   * the key most recently
-   */
-  private Map<String, String[]> updatingResource;
- 
-  /**
-   * Class to keep the information about the keys which replace the deprecated
-   * ones.
-   * 
-   * This class stores the new keys which replace the deprecated keys and also
-   * gives a provision to have a custom message for each of the deprecated key
-   * that is being replaced. It also provides method to get the appropriate
-   * warning message which can be logged whenever the deprecated key is used.
-   */
-  private static class DeprecatedKeyInfo {
-    private final String[] newKeys;
-    private final String customMessage;
-    private final AtomicBoolean accessed = new AtomicBoolean(false);
-
-    DeprecatedKeyInfo(String[] newKeys, String customMessage) {
-      this.newKeys = newKeys;
-      this.customMessage = customMessage;
-    }
-
-    /**
-     * Method to provide the warning message. It gives the custom message if
-     * non-null, and default message otherwise.
-     * @param key the associated deprecated key.
-     * @return message that is to be logged when a deprecated key is used.
-     */
-    private final String getWarningMessage(String key) {
-      String warningMessage;
-      if(customMessage == null) {
-        StringBuilder message = new StringBuilder(key);
-        String deprecatedKeySuffix = " is deprecated. Instead, use ";
-        message.append(deprecatedKeySuffix);
-        for (int i = 0; i < newKeys.length; i++) {
-          message.append(newKeys[i]);
-          if(i != newKeys.length-1) {
-            message.append(", ");
-          }
-        }
-        warningMessage = message.toString();
-      }
-      else {
-        warningMessage = customMessage;
-      }
-      return warningMessage;
-    }
-
-    boolean getAndSetAccessed() {
-      return accessed.getAndSet(true);
-    }
-
-    public void clearAccessed() {
-      accessed.set(false);
-    }
-  }
-  
-  /**
-   * A pending addition to the global set of deprecated keys.
-   */
-  public static class DeprecationDelta {
-    private final String key;
-    private final String[] newKeys;
-    private final String customMessage;
-
-    DeprecationDelta(String key, String[] newKeys, String customMessage) {
-      Preconditions.checkNotNull(key);
-      Preconditions.checkNotNull(newKeys);
-      Preconditions.checkArgument(newKeys.length > 0);
-      this.key = key;
-      this.newKeys = newKeys;
-      this.customMessage = customMessage;
-    }
-
-    public DeprecationDelta(String key, String newKey, String customMessage) {
-      this(key, new String[] { newKey }, customMessage);
-    }
-
-    public DeprecationDelta(String key, String newKey) {
-      this(key, new String[] { newKey }, null);
-    }
-
-    public String getKey() {
-      return key;
-    }
-
-    public String[] getNewKeys() {
-      return newKeys;
-    }
-
-    public String getCustomMessage() {
-      return customMessage;
-    }
-  }
-
-  /**
-   * The set of all keys which are deprecated.
-   *
-   * DeprecationContext objects are immutable.
-   */
-  private static class DeprecationContext {
-    /**
-     * Stores the deprecated keys, the new keys which replace the deprecated keys
-     * and custom message(if any provided).
-     */
-    private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;
-
-    /**
-     * Stores a mapping from superseding keys to the keys which they deprecate.
-     */
-    private final Map<String, String> reverseDeprecatedKeyMap;
-
-    /**
-     * Create a new DeprecationContext by copying a previous DeprecationContext
-     * and adding some deltas.
-     *
-     * @param other   The previous deprecation context to copy, or null to start
-     *                from nothing.
-     * @param deltas  The deltas to apply.
-     */
-    @SuppressWarnings("unchecked")
-    DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {
-      HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap = 
-        new HashMap<String, DeprecatedKeyInfo>();
-      HashMap<String, String> newReverseDeprecatedKeyMap =
-        new HashMap<String, String>();
-      if (other != null) {
-        for (Entry<String, DeprecatedKeyInfo> entry :
-            other.deprecatedKeyMap.entrySet()) {
-          newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
-        }
-        for (Entry<String, String> entry :
-            other.reverseDeprecatedKeyMap.entrySet()) {
-          newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
-        }
-      }
-      for (DeprecationDelta delta : deltas) {
-        if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {
-          DeprecatedKeyInfo newKeyInfo =
-            new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());
-          newDeprecatedKeyMap.put(delta.key, newKeyInfo);
-          for (String newKey : delta.getNewKeys()) {
-            newReverseDeprecatedKeyMap.put(newKey, delta.key);
-          }
-        }
-      }
-      this.deprecatedKeyMap =
-        UnmodifiableMap.decorate(newDeprecatedKeyMap);
-      this.reverseDeprecatedKeyMap =
-        UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);
-    }
-
-    Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {
-      return deprecatedKeyMap;
-    }
-
-    Map<String, String> getReverseDeprecatedKeyMap() {
-      return reverseDeprecatedKeyMap;
-    }
-  }
-  
-  private static DeprecationDelta[] defaultDeprecations = 
-    new DeprecationDelta[] {
-      new DeprecationDelta("topology.script.file.name", 
-        CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY),
-      new DeprecationDelta("topology.script.number.args", 
-        CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY),
-      new DeprecationDelta("hadoop.configured.node.mapping", 
-        CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY),
-      new DeprecationDelta("topology.node.switch.mapping.impl", 
-        CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY),
-      new DeprecationDelta("dfs.df.interval", 
-        CommonConfigurationKeys.FS_DF_INTERVAL_KEY),
-      new DeprecationDelta("hadoop.native.lib", 
-        CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY),
-      new DeprecationDelta("fs.default.name", 
-        CommonConfigurationKeys.FS_DEFAULT_NAME_KEY),
-      new DeprecationDelta("dfs.umaskmode",
-        CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY),
-      new DeprecationDelta("dfs.nfs.exports.allowed.hosts",
-          CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_KEY)
-    };
-
-  /**
-   * The global DeprecationContext.
-   */
-  private static AtomicReference<DeprecationContext> deprecationContext =
-      new AtomicReference<DeprecationContext>(
-          new DeprecationContext(null, defaultDeprecations));
-
-  /**
-   * Adds a set of deprecated keys to the global deprecations.
-   *
-   * This method is lockless.  It works by means of creating a new
-   * DeprecationContext based on the old one, and then atomically swapping in
-   * the new context.  If someone else updated the context in between us reading
-   * the old context and swapping in the new one, we try again until we win the
-   * race.
-   *
-   * @param deltas   The deprecations to add.
-   */
-  public static void addDeprecations(DeprecationDelta[] deltas) {
-    DeprecationContext prev, next;
-    do {
-      prev = deprecationContext.get();
-      next = new DeprecationContext(prev, deltas);
-    } while (!deprecationContext.compareAndSet(prev, next));
-  }
-
-  /**
-   * Adds the deprecated key to the global deprecation map.
-   * It does not override any existing entries in the deprecation map.
-   * This is to be used only by the developers in order to add deprecation of
-   * keys, and attempts to call this method after loading resources once,
-   * would lead to <tt>UnsupportedOperationException</tt>
-   * 
-   * If a key is deprecated in favor of multiple keys, they are all treated as 
-   * aliases of each other, and setting any one of them resets all the others 
-   * to the new value.
-   *
-   * If you have multiple deprecation entries to add, it is more efficient to
-   * use #addDeprecations(DeprecationDelta[] deltas) instead.
-   * 
-   * @param key
-   * @param newKeys
-   * @param customMessage
-   * @deprecated use {@link #addDeprecation(String key, String newKey,
-      String customMessage)} instead
-   */
-  @Deprecated
-  public static void addDeprecation(String key, String[] newKeys,
-      String customMessage) {
-    addDeprecations(new DeprecationDelta[] {
-      new DeprecationDelta(key, newKeys, customMessage)
-    });
-  }
-
-  /**
-   * Adds the deprecated key to the global deprecation map.
-   * It does not override any existing entries in the deprecation map.
-   * This is to be used only by the developers in order to add deprecation of
-   * keys, and attempts to call this method after loading resources once,
-   * would lead to <tt>UnsupportedOperationException</tt>
-   * 
-   * If you have multiple deprecation entries to add, it is more efficient to
-   * use #addDeprecations(DeprecationDelta[] deltas) instead.
-   *
-   * @param key
-   * @param newKey
-   * @param customMessage
-   */
-  public static void addDeprecation(String key, String newKey,
-	      String customMessage) {
-	  addDeprecation(key, new String[] {newKey}, customMessage);
-  }
-
-  /**
-   * Adds the deprecated key to the global deprecation map when no custom
-   * message is provided.
-   * It does not override any existing entries in the deprecation map.
-   * This is to be used only by the developers in order to add deprecation of
-   * keys, and attempts to call this method after loading resources once,
-   * would lead to <tt>UnsupportedOperationException</tt>
-   * 
-   * If a key is deprecated in favor of multiple keys, they are all treated as 
-   * aliases of each other, and setting any one of them resets all the others 
-   * to the new value.
-   * 
-   * If you have multiple deprecation entries to add, it is more efficient to
-   * use #addDeprecations(DeprecationDelta[] deltas) instead.
-   *
-   * @param key Key that is to be deprecated
-   * @param newKeys list of keys that take up the values of deprecated key
-   * @deprecated use {@link #addDeprecation(String key, String newKey)} instead
-   */
-  @Deprecated
-  public static void addDeprecation(String key, String[] newKeys) {
-    addDeprecation(key, newKeys, null);
-  }
-  
-  /**
-   * Adds the deprecated key to the global deprecation map when no custom
-   * message is provided.
-   * It does not override any existing entries in the deprecation map.
-   * This is to be used only by the developers in order to add deprecation of
-   * keys, and attempts to call this method after loading resources once,
-   * would lead to <tt>UnsupportedOperationException</tt>
-   * 
-   * If you have multiple deprecation entries to add, it is more efficient to
-   * use #addDeprecations(DeprecationDelta[] deltas) instead.
-   *
-   * @param key Key that is to be deprecated
-   * @param newKey key that takes up the value of deprecated key
-   */
-  public static void addDeprecation(String key, String newKey) {
-    addDeprecation(key, new String[] {newKey}, null);
-  }
-  
-  /**
-   * checks whether the given <code>key</code> is deprecated.
-   * 
-   * @param key the parameter which is to be checked for deprecation
-   * @return <code>true</code> if the key is deprecated and 
-   *         <code>false</code> otherwise.
-   */
-  public static boolean isDeprecated(String key) {
-    return deprecationContext.get().getDeprecatedKeyMap().containsKey(key);
-  }
-
-  /**
-   * Sets all deprecated properties that are not currently set but have a
-   * corresponding new property that is set. Useful for iterating the
-   * properties when all deprecated properties for currently set properties
-   * need to be present.
-   */
-  public void setDeprecatedProperties() {
-    DeprecationContext deprecations = deprecationContext.get();
-    Properties props = getProps();
-    Properties overlay = getOverlay();
-    for (Map.Entry<String, DeprecatedKeyInfo> entry :
-        deprecations.getDeprecatedKeyMap().entrySet()) {
-      String depKey = entry.getKey();
-      if (!overlay.contains(depKey)) {
-        for (String newKey : entry.getValue().newKeys) {
-          String val = overlay.getProperty(newKey);
-          if (val != null) {
-            props.setProperty(depKey, val);
-            overlay.setProperty(depKey, val);
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Checks for the presence of the property <code>name</code> in the
-   * deprecation map. Returns the first of the list of new keys if present
-   * in the deprecation map or the <code>name</code> itself. If the property
-   * is not presently set but the property map contains an entry for the
-   * deprecated key, the value of the deprecated key is set as the value for
-   * the provided property name.
-   *
-   * @param name the property name
-   * @return the first property in the list of properties mapping
-   *         the <code>name</code> or the <code>name</code> itself.
-   */
-  private String[] handleDeprecation(DeprecationContext deprecations,
-      String name) {
-    if (null != name) {
-      name = name.trim();
-    }
-    ArrayList<String > names = new ArrayList<String>();
-	if (isDeprecated(name)) {
-      DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
-      warnOnceIfDeprecated(deprecations, name);
-      for (String newKey : keyInfo.newKeys) {
-        if(newKey != null) {
-          names.add(newKey);
-        }
-      }
-    }
-    if(names.size() == 0) {
-    	names.add(name);
-    }
-    for(String n : names) {
-	  String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n);
-	  if (deprecatedKey != null && !getOverlay().containsKey(n) &&
-	      getOverlay().containsKey(deprecatedKey)) {
-	    getProps().setProperty(n, getOverlay().getProperty(deprecatedKey));
-	    getOverlay().setProperty(n, getOverlay().getProperty(deprecatedKey));
-	  }
-    }
-    return names.toArray(new String[names.size()]);
-  }
- 
-  private void handleDeprecation() {
-    LOG.debug("Handling deprecation for all properties in config...");
-    DeprecationContext deprecations = deprecationContext.get();
-    Set<Object> keys = new HashSet<Object>();
-    keys.addAll(getProps().keySet());
-    for (Object item: keys) {
-      LOG.debug("Handling deprecation for " + (String)item);
-      handleDeprecation(deprecations, (String)item);
-    }
-  }
- 
-  static{
-    //print deprecation warning if hadoop-site.xml is found in classpath
-    ClassLoader cL = Thread.currentThread().getContextClassLoader();
-    if (cL == null) {
-      cL = Configuration.class.getClassLoader();
-    }
-    if(cL.getResource("hadoop-site.xml")!=null) {
-      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
-          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
-          + "mapred-site.xml and hdfs-site.xml to override properties of " +
-          "core-default-shaded.xml, mapred-default.xml and hdfs-default.xml " +
-          "respectively");
-    }
-    addDefaultResource("core-default-shaded.xml");
-    addDefaultResource("core-site.xml");
-  }
-  
-  private Properties properties;
-  private Properties overlay;
-  private ClassLoader classLoader;
-  {
-    classLoader = Thread.currentThread().getContextClassLoader();
-    if (classLoader == null) {
-      classLoader = Configuration.class.getClassLoader();
-    }
-  }
-  
-  /** A new configuration. */
-  public Configuration() {
-    this(true);
-  }
-
-  /** A new configuration where the behavior of reading from the default 
-   * resources can be turned off.
-   * 
-   * If the parameter {@code loadDefaults} is false, the new instance
-   * will not load resources from the default files. 
-   * @param loadDefaults specifies whether to load from the default files
-   */
-  public Configuration(boolean loadDefaults) {
-    this.loadDefaults = loadDefaults;
-    updatingResource = new ConcurrentHashMap<String, String[]>();
-    synchronized(Configuration.class) {
-      REGISTRY.put(this, null);
-    }
-  }
-  
-  /** 
-   * A new configuration with the same settings cloned from another.
-   * 
-   * @param other the configuration from which to clone settings.
-   */
-  @SuppressWarnings("unchecked")
-  public Configuration(Configuration other) {
-   this.resources = (ArrayList<Resource>) other.resources.clone();
-   synchronized(other) {
-     if (other.properties != null) {
-       this.properties = (Properties)other.properties.clone();
-     }
-
-     if (other.overlay!=null) {
-       this.overlay = (Properties)other.overlay.clone();
-     }
-
-     this.updatingResource = new ConcurrentHashMap<String, String[]>(
-         other.updatingResource);
-     this.finalParameters = Collections.newSetFromMap(
-         new ConcurrentHashMap<String, Boolean>());
-     this.finalParameters.addAll(other.finalParameters);
-   }
-   
-    synchronized(Configuration.class) {
-      REGISTRY.put(this, null);
-    }
-    this.classLoader = other.classLoader;
-    this.loadDefaults = other.loadDefaults;
-    setQuietMode(other.getQuietMode());
-  }
-  
-  /**
-   * Add a default resource. Resources are loaded in the order of the resources 
-   * added.
-   * @param name file name. File should be present in the classpath.
-   */
-  public static synchronized void addDefaultResource(String name) {
-    if(!defaultResources.contains(name)) {
-      defaultResources.add(name);
-      for(Configuration conf : REGISTRY.keySet()) {
-        if(conf.loadDefaults) {
-          conf.reloadConfiguration();
-        }
-      }
-    }
-  }
-
-  /**
-   * Add a configuration resource. 
-   * 
-   * The properties of this resource will override properties of previously 
-   * added resources, unless they were marked <a href="#Final">final</a>. 
-   * 
-   * @param name resource to be added, the classpath is examined for a file 
-   *             with that name.
-   */
-  public void addResource(String name) {
-    addResourceObject(new Resource(name));
-  }
-
-  /**
-   * Add a configuration resource. 
-   * 
-   * The properties of this resource will override properties of previously 
-   * added resources, unless they were marked <a href="#Final">final</a>. 
-   * 
-   * @param url url of the resource to be added, the local filesystem is 
-   *            examined directly to find the resource, without referring to 
-   *            the classpath.
-   */
-  public void addResource(URL url) {
-    addResourceObject(new Resource(url));
-  }
-
-  /**
-   * Add a configuration resource. 
-   * 
-   * The properties of this resource will override properties of previously 
-   * added resources, unless they were marked <a href="#Final">final</a>. 
-   * 
-   * @param file file-path of resource to be added, the local filesystem is
-   *             examined directly to find the resource, without referring to 
-   *             the classpath.
-   */
-  public void addResource(Path file) {
-    addResourceObject(new Resource(file));
-  }
-
-  /**
-   * Add a configuration resource. 
-   * 
-   * The properties of this resource will override properties of previously 
-   * added resources, unless they were marked <a href="#Final">final</a>. 
-   * 
-   * WARNING: The contents of the InputStream will be cached, by this method. 
-   * So use this sparingly because it does increase the memory consumption.
-   * 
-   * @param in InputStream to deserialize the object from. In will be read from
-   * when a get or set is called next.  After it is read the stream will be
-   * closed. 
-   */
-  public void addResource(InputStream in) {
-    addResourceObject(new Resource(in));
-  }
-
-  /**
-   * Add a configuration resource. 
-   * 
-   * The properties of this resource will override properties of previously 
-   * added resources, unless they were marked <a href="#Final">final</a>. 
-   * 
-   * @param in InputStream to deserialize the object from.
-   * @param name the name of the resource because InputStream.toString is not
-   * very descriptive some times.  
-   */
-  public void addResource(InputStream in, String name) {
-    addResourceObject(new Resource(in, name));
-  }
-  
-  /**
-   * Add a configuration resource.
-   *
-   * The properties of this resource will override properties of previously
-   * added resources, unless they were marked <a href="#Final">final</a>.
-   *
-   * @param conf Configuration object from which to load properties
-   */
-  public void addResource(Configuration conf) {
-    addResourceObject(new Resource(conf.getProps()));
-  }
-
-  
-  
-  /**
-   * Reload configuration from previously added resources.
-   *
-   * This method will clear all the configuration read from the added 
-   * resources, and final parameters. This will make the resources to 
-   * be read again before accessing the values. Values that are added
-   * via set methods will overlay values read from the resources.
-   */
-  public synchronized void reloadConfiguration() {
-    properties = null;                            // trigger reload
-    finalParameters.clear();                      // clear site-limits
-  }
-  
-  private synchronized void addResourceObject(Resource resource) {
-    resources.add(resource);                      // add to resources
-    reloadConfiguration();
-  }
-
-  private static final int MAX_SUBST = 20;
-
-  private static final int SUB_START_IDX = 0;
-  private static final int SUB_END_IDX = SUB_START_IDX + 1;
-
-  /**
-   * This is a manual implementation of the following regex
-   * "\\$\\{[^\\}\\$\u0020]+\\}". It can be 15x more efficient than
-   * a regex matcher as demonstrated by HADOOP-11506. This is noticeable with
-   * Hadoop apps building on the assumption Configuration#get is an O(1)
-   * hash table lookup, especially when the eval is a long string.
-   *
-   * @param eval a string that may contain variables requiring expansion.
-   * @return a 2-element int array res such that
-   * eval.substring(res[0], res[1]) is "var" for the left-most occurrence of
-   * ${var} in eval. If no variable is found -1, -1 is returned.
-   */
-  private static int[] findSubVariable(String eval) {
-    int[] result = {-1, -1};
-
-    int matchStart;
-    int leftBrace;
-
-    // scanning for a brace first because it's less frequent than $
-    // that can occur in nested class names
-    //
-    match_loop:
-    for (matchStart = 1, leftBrace = eval.indexOf('{', matchStart);
-         // minimum left brace position (follows '$')
-         leftBrace > 0
-         // right brace of a smallest valid expression "${c}"
-         && leftBrace + "{c".length() < eval.length();
-         leftBrace = eval.indexOf('{', matchStart)) {
-      int matchedLen = 0;
-      if (eval.charAt(leftBrace - 1) == '$') {
-        int subStart = leftBrace + 1; // after '{'
-        for (int i = subStart; i < eval.length(); i++) {
-          switch (eval.charAt(i)) {
-            case '}':
-              if (matchedLen > 0) { // match
-                result[SUB_START_IDX] = subStart;
-                result[SUB_END_IDX] = subStart + matchedLen;
-                break match_loop;
-              }
-              // fall through to skip 1 char
-            case ' ':
-            case '$':
-              matchStart = i + 1;
-              continue match_loop;
-            default:
-              matchedLen++;
-          }
-        }
-        // scanned from "${"  to the end of eval, and no reset via ' ', '$':
-        //    no match!
-        break match_loop;
-      } else {
-        // not a start of a variable
-        //
-        matchStart = leftBrace + 1;
-      }
-    }
-    return result;
-  }
-
-  /**
-   * Attempts to repeatedly expand the value {@code expr} by replacing the
-   * left-most substring of the form "${var}" in the following precedence order
-   * <ol>
-   *   <li>by the value of the Java system property "var" if defined</li>
-   *   <li>by the value of the configuration key "var" if defined</li>
-   * </ol>
-   *
-   * If var is unbounded the current state of expansion "prefix${var}suffix" is
-   * returned.
-   *
-   * @param expr the literal value of a config key
-   * @return null if expr is null, otherwise the value resulting from expanding
-   * expr using the algorithm above.
-   * @throws IllegalArgumentException when more than
-   * {@link Configuration#MAX_SUBST} replacements are required
-   */
-  private String substituteVars(String expr) {
-    if (expr == null) {
-      return null;
-    }
-    String eval = expr;
-    for (int s = 0; s < MAX_SUBST; s++) {
-      final int[] varBounds = findSubVariable(eval);
-      if (varBounds[SUB_START_IDX] == -1) {
-        return eval;
-      }
-      final String var = eval.substring(varBounds[SUB_START_IDX],
-          varBounds[SUB_END_IDX]);
-      String val = null;
-      try {
-        val = System.getProperty(var);
-      } catch(SecurityException se) {
-        LOG.warn("Unexpected SecurityException in Configuration", se);
-      }
-      if (val == null) {
-        val = getRaw(var);
-      }
-      if (val == null) {
-        return eval; // return literal ${var}: var is unbound
-      }
-      final int dollar = varBounds[SUB_START_IDX] - "${".length();
-      final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
-      // substitute
-      eval = eval.substring(0, dollar)
-             + val
-             + eval.substring(afterRightBrace);
-    }
-    throw new IllegalStateException("Variable substitution depth too large: " 
-                                    + MAX_SUBST + " " + expr);
-  }
-  
-  /**
-   * Get the value of the <code>name</code> property, <code>null</code> if
-   * no such property exists. If the key is deprecated, it returns the value of
-   * the first key which replaces the deprecated key and is not null.
-   * 
-   * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
-   * before being returned. 
-   * 
-   * @param name the property name, will be trimmed before get value.
-   * @return the value of the <code>name</code> or its replacing property, 
-   *         or null if no such property exists.
-   */
-  public String get(String name) {
-    String[] names = handleDeprecation(deprecationContext.get(), name);
-    String result = null;
-    for(String n : names) {
-      result = substituteVars(getProps().getProperty(n));
-    }
-    return result;
-  }
-
-  /**
-   * Set Configuration to allow keys without values during setup.  Intended
-   * for use during testing.
-   *
-   * @param val If true, will allow Configuration to store keys without values
-   */
-  @VisibleForTesting
-  public void setAllowNullValueProperties( boolean val ) {
-    this.allowNullValueProperties = val;
-  }
-
-  /**
-   * Return existence of the <code>name</code> property, but only for
-   * names which have no valid value, usually non-existent or commented
-   * out in XML.
-   *
-   * @param name the property name
-   * @return true if the property <code>name</code> exists without value
-   */
-  @VisibleForTesting
-  public boolean onlyKeyExists(String name) {
-    String[] names = handleDeprecation(deprecationContext.get(), name);
-    for(String n : names) {
-      if ( getProps().getProperty(n,DEFAULT_STRING_CHECK)
-               .equals(DEFAULT_STRING_CHECK) ) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Get the value of the <code>name</code> property as a trimmed <code>String</code>, 
-   * <code>null</code> if no such property exists. 
-   * If the key is deprecated, it returns the value of
-   * the first key which replaces the deprecated key and is not null
-   * 
-   * Values are processed for <a href="#VariableExpansion">variable expansion</a> 
-   * before being returned. 
-   * 
-   * @param name the property name.
-   * @return the value of the <code>name</code> or its replacing property, 
-   *         or null if no such property exists.
-   */
-  public String getTrimmed(String name) {
-    String value = get(name);
-    
-    if (null == value) {
-      return null;
-    } else {
-      return value.trim();
-    }
-  }
-  
-  /**
-   * Get the value of the <code>name</code> property as a trimmed <code>String</code>, 
-   * <code>defaultValue</code> if no such property exists. 
-   * See @{Configuration#getTrimmed} for more details.
-   * 
-   * @param name          the property name.
-   * @param defaultValue  the property default value.
-   * @return              the value of the <code>name</code> or defaultValue
-   *                      if it is not set.
-   */
-  public String getTrimmed(String name, String defaultValue) {
-    String ret = getTrimmed(name);
-    return ret == null ? defaultValue : ret;
-  }
-
-  /**
-   * Get the value of the <code>name</code> property, without doing
-   * <a href="#VariableExpansion">variable expansion</a>.If the key is 
-   * deprecated, it returns the value of the first key which replaces 
-   * the deprecated key and is not null.
-   * 
-   * @param name the property name.
-   * @return the value of the <code>name</code> property or 
-   *         its replacing property and null if no such property exists.
-   */
-  public String getRaw(String name) {
-    String[] names = handleDeprecation(deprecationContext.get(), name);
-    String result = null;
-    for(String n : names) {
-      result = getProps().getProperty(n);
-    }
-    return result;
-  }
-
-  /**
-   * Returns alternative names (non-deprecated keys or previously-set deprecated keys)
-   * for a given non-deprecated key.
-   * If the given key is deprecated, return null.
-   *
-   * @param name property name.
-   * @return alternative names.
-   */
-  private String[] getAlternativeNames(String name) {
-    String altNames[] = null;
-    DeprecatedKeyInfo keyInfo = null;
-    DeprecationContext cur = deprecationContext.get();
-    String depKey = cur.getReverseDeprecatedKeyMap().get(name);
-    if(depKey != null) {
-      keyInfo = cur.getDeprecatedKeyMap().get(depKey);
-      if(keyInfo.newKeys.length > 0) {
-        if(getProps().containsKey(depKey)) {
-          //if deprecated key is previously set explicitly
-          List<String> list = new ArrayList<String>();
-          list.addAll(Arrays.asList(keyInfo.newKeys));
-          list.add(depKey);
-          altNames = list.toArray(new String[list.size()]);
-        }
-        else {
-          altNames = keyInfo.newKeys;
-        }
-      }
-    }
-    return altNames;
-  }
-
-  /** 
-   * Set the <code>value</code> of the <code>name</code> property. If 
-   * <code>name</code> is deprecated or there is a deprecated name associated to it,
-   * it sets the value to both names. Name will be trimmed before put into
-   * configuration.
-   * 
-   * @param name property name.
-   * @param value property value.
-   */
-  public void set(String name, String value) {
-    set(name, value, null);
-  }
-  
-  /** 
-   * Set the <code>value</code> of the <code>name</code> property. If 
-   * <code>name</code> is deprecated, it also sets the <code>value</code> to
-   * the keys that replace the deprecated key. Name will be trimmed before put
-   * into configuration.
-   *
-   * @param name property name.
-   * @param value property value.
-   * @param source the place that this configuration value came from 
-   * (For debugging).
-   * @throws IllegalArgumentException when the value or name is null.
-   */
-  public void set(String name, String value, String source) {
-    Preconditions.checkArgument(
-        name != null,
-        "Property name must not be null");
-    Preconditions.checkArgument(
-        value != null,
-        "The value of property " + name + " must not be null");
-    name = name.trim();
-    DeprecationContext deprecations = deprecationContext.get();
-    if (deprecations.getDeprecatedKeyMap().isEmpty()) {
-      getProps();
-    }
-    getOverlay().setProperty(name, value);
-    getProps().setProperty(name, value);
-    String newSource = (source == null ? "programatically" : source);
-
-    if (!isDeprecated(name)) {
-      updatingResource.put(name, new String[] {newSource});
-      String[] altNames = getAlternativeNames(name);
-      if(altNames != null) {
-        for(String n: altNames) {
-          if(!n.equals(name)) {
-            getOverlay().setProperty(n, value);
-            getProps().setProperty(n, value);
-            updatingResource.put(n, new String[] {newSource});
-          }
-        }
-      }
-    }
-    else {
-      String[] names = handleDeprecation(deprecationContext.get(), name);
-      String altSource = "because " + name + " is deprecated";
-      for(String n : names) {
-        getOverlay().setProperty(n, value);
-        getProps().setProperty(n, value);
-        updatingResource.put(n, new String[] {altSource});
-      }
-    }
-  }
-
-  private void warnOnceIfDeprecated(DeprecationContext deprecations, String name) {
-    DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
-    if (keyInfo != null && !keyInfo.getAndSetAccessed()) {
-      LOG_DEPRECATION.info(keyInfo.getWarningMessage(name));
-    }
-  }
-
-  /**
-   * Unset a previously set property.
-   */
-  public synchronized void unset(String name) {
-    String[] names = null;
-    if (!isDeprecated(name)) {
-      names = getAlternativeNames(name);
-      if(names == null) {
-    	  names = new String[]{name};
-      }
-    }
-    else {
-      names = handleDeprecation(deprecationContext.get(), name);
-    }
-
-    for(String n: names) {
-      getOverlay().remove(n);
-      getProps().remove(n);
-    }
-  }
-
-  /**
-   * Sets a property if it is currently unset.
-   * @param name the property name
-   * @param value the new value
-   */
-  public synchronized void setIfUnset(String name, String value) {
-    if (get(name) == null) {
-      set(name, value);
-    }
-  }
-  
-  private synchronized Properties getOverlay() {
-    if (overlay==null){
-      overlay=new Properties();
-    }
-    return overlay;
-  }
-
-  /** 
-   * Get the value of the <code>name</code>. If the key is deprecated,
-   * it returns the value of the first key which replaces the deprecated key
-   * and is not null.
-   * If no such property exists,
-   * then <code>defaultValue</code> is returned.
-   * 
-   * @param name property name, will be trimmed before get value.
-   * @param defaultValue default value.
-   * @return property value, or <code>defaultValue</code> if the property 
-   *         doesn't exist.                    
-   */
-  public String get(String name, String defaultValue) {
-    String[] names = handleDeprecation(deprecationContext.get(), name);
-    String result = null;
-    for(String n : names) {
-      result = substituteVars(getProps().getProperty(n, defaultValue));
-    }
-    return result;
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property as an <code>int</code>.
-   *   
-   * If no such property exists, the provided default value is returned,
-   * or if the specified value is not a valid <code>int</code>,
-   * then an error is thrown.
-   * 
-   * @param name property name.
-   * @param defaultValue default value.
-   * @throws NumberFormatException when the value is invalid
-   * @return property value as an <code>int</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public int getInt(String name, int defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    String hexString = getHexDigits(valueString);
-    if (hexString != null) {
-      return Integer.parseInt(hexString, 16);
-    }
-    return Integer.parseInt(valueString);
-  }
-  
-  /**
-   * Get the value of the <code>name</code> property as a set of comma-delimited
-   * <code>int</code> values.
-   * 
-   * If no such property exists, an empty array is returned.
-   * 
-   * @param name property name
-   * @return property value interpreted as an array of comma-delimited
-   *         <code>int</code> values
-   */
-  public int[] getInts(String name) {
-    String[] strings = getTrimmedStrings(name);
-    int[] ints = new int[strings.length];
-    for (int i = 0; i < strings.length; i++) {
-      ints[i] = Integer.parseInt(strings[i]);
-    }
-    return ints;
-  }
-
-  /** 
-   * Set the value of the <code>name</code> property to an <code>int</code>.
-   * 
-   * @param name property name.
-   * @param value <code>int</code> value of the property.
-   */
-  public void setInt(String name, int value) {
-    set(name, Integer.toString(value));
-  }
-
-
-  /** 
-   * Get the value of the <code>name</code> property as a <code>long</code>.  
-   * If no such property exists, the provided default value is returned,
-   * or if the specified value is not a valid <code>long</code>,
-   * then an error is thrown.
-   * 
-   * @param name property name.
-   * @param defaultValue default value.
-   * @throws NumberFormatException when the value is invalid
-   * @return property value as a <code>long</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public long getLong(String name, long defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    String hexString = getHexDigits(valueString);
-    if (hexString != null) {
-      return Long.parseLong(hexString, 16);
-    }
-    return Long.parseLong(valueString);
-  }
-
-  /**
-   * Get the value of the <code>name</code> property as a <code>long</code> or
-   * human readable format. If no such property exists, the provided default
-   * value is returned, or if the specified value is not a valid
-   * <code>long</code> or human readable format, then an error is thrown. You
-   * can use the following suffix (case insensitive): k(kilo), m(mega), g(giga),
-   * t(tera), p(peta), e(exa)
-   *
-   * @param name property name.
-   * @param defaultValue default value.
-   * @throws NumberFormatException when the value is invalid
-   * @return property value as a <code>long</code>,
-   *         or <code>defaultValue</code>.
-   */
-  public long getLongBytes(String name, long defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    return StringUtils.TraditionalBinaryPrefix.string2long(valueString);
-  }
-
-  private String getHexDigits(String value) {
-    boolean negative = false;
-    String str = value;
-    String hexString = null;
-    if (value.startsWith("-")) {
-      negative = true;
-      str = value.substring(1);
-    }
-    if (str.startsWith("0x") || str.startsWith("0X")) {
-      hexString = str.substring(2);
-      if (negative) {
-        hexString = "-" + hexString;
-      }
-      return hexString;
-    }
-    return null;
-  }
-  
-  /** 
-   * Set the value of the <code>name</code> property to a <code>long</code>.
-   * 
-   * @param name property name.
-   * @param value <code>long</code> value of the property.
-   */
-  public void setLong(String name, long value) {
-    set(name, Long.toString(value));
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property as a <code>float</code>.  
-   * If no such property exists, the provided default value is returned,
-   * or if the specified value is not a valid <code>float</code>,
-   * then an error is thrown.
-   *
-   * @param name property name.
-   * @param defaultValue default value.
-   * @throws NumberFormatException when the value is invalid
-   * @return property value as a <code>float</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public float getFloat(String name, float defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    return Float.parseFloat(valueString);
-  }
-
-  /**
-   * Set the value of the <code>name</code> property to a <code>float</code>.
-   * 
-   * @param name property name.
-   * @param value property value.
-   */
-  public void setFloat(String name, float value) {
-    set(name,Float.toString(value));
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property as a <code>double</code>.  
-   * If no such property exists, the provided default value is returned,
-   * or if the specified value is not a valid <code>double</code>,
-   * then an error is thrown.
-   *
-   * @param name property name.
-   * @param defaultValue default value.
-   * @throws NumberFormatException when the value is invalid
-   * @return property value as a <code>double</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public double getDouble(String name, double defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    return Double.parseDouble(valueString);
-  }
-
-  /**
-   * Set the value of the <code>name</code> property to a <code>double</code>.
-   * 
-   * @param name property name.
-   * @param value property value.
-   */
-  public void setDouble(String name, double value) {
-    set(name,Double.toString(value));
-  }
- 
-  /** 
-   * Get the value of the <code>name</code> property as a <code>boolean</code>.  
-   * If no such property is specified, or if the specified value is not a valid
-   * <code>boolean</code>, then <code>defaultValue</code> is returned.
-   * 
-   * @param name property name.
-   * @param defaultValue default value.
-   * @return property value as a <code>boolean</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public boolean getBoolean(String name, boolean defaultValue) {
-    String valueString = getTrimmed(name);
-    if (null == valueString || valueString.isEmpty()) {
-      return defaultValue;
-    }
-
-    if (StringUtils.equalsIgnoreCase("true", valueString))
-      return true;
-    else if (StringUtils.equalsIgnoreCase("false", valueString))
-      return false;
-    else return defaultValue;
-  }
-
-  /** 
-   * Set the value of the <code>name</code> property to a <code>boolean</code>.
-   * 
-   * @param name property name.
-   * @param value <code>boolean</code> value of the property.
-   */
-  public void setBoolean(String name, boolean value) {
-    set(name, Boolean.toString(value));
-  }
-
-  /**
-   * Set the given property, if it is currently unset.
-   * @param name property name
-   * @param value new value
-   */
-  public void setBooleanIfUnset(String name, boolean value) {
-    setIfUnset(name, Boolean.toString(value));
-  }
-
-  /**
-   * Set the value of the <code>name</code> property to the given type. This
-   * is equivalent to <code>set(&lt;name&gt;, value.toString())</code>.
-   * @param name property name
-   * @param value new value
-   */
-  public <T extends Enum<T>> void setEnum(String name, T value) {
-    set(name, value.toString());
-  }
-
-  /**
-   * Return value matching this enumerated type.
-   * Note that the returned value is trimmed by this method.
-   * @param name Property name
-   * @param defaultValue Value returned if no mapping exists
-   * @throws IllegalArgumentException If mapping is illegal for the type
-   * provided
-   */
-  public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
-    final String val = getTrimmed(name);
-    return null == val
-      ? defaultValue
-      : Enum.valueOf(defaultValue.getDeclaringClass(), val);
-  }
-
-  enum ParsedTimeDuration {
-    NS {
-      TimeUnit unit() { return TimeUnit.NANOSECONDS; }
-      String suffix() { return "ns"; }
-    },
-    US {
-      TimeUnit unit() { return TimeUnit.MICROSECONDS; }
-      String suffix() { return "us"; }
-    },
-    MS {
-      TimeUnit unit() { return TimeUnit.MILLISECONDS; }
-      String suffix() { return "ms"; }
-    },
-    S {
-      TimeUnit unit() { return TimeUnit.SECONDS; }
-      String suffix() { return "s"; }
-    },
-    M {
-      TimeUnit unit() { return TimeUnit.MINUTES; }
-      String suffix() { return "m"; }
-    },
-    H {
-      TimeUnit unit() { return TimeUnit.HOURS; }
-      String suffix() { return "h"; }
-    },
-    D {
-      TimeUnit unit() { return TimeUnit.DAYS; }
-      String suffix() { return "d"; }
-    };
-    abstract TimeUnit unit();
-    abstract String suffix();
-    static ParsedTimeDuration unitFor(String s) {
-      for (ParsedTimeDuration ptd : values()) {
-        // iteration order is in decl order, so SECONDS matched last
-        if (s.endsWith(ptd.suffix())) {
-          return ptd;
-        }
-      }
-      return null;
-    }
-    static ParsedTimeDuration unitFor(TimeUnit unit) {
-      for (ParsedTimeDuration ptd : values()) {
-        if (ptd.unit() == unit) {
-          return ptd;
-        }
-      }
-      return null;
-    }
-  }
-
-  /**
-   * Set the value of <code>name</code> to the given time duration. This
-   * is equivalent to <code>set(&lt;name&gt;, value + &lt;time suffix&gt;)</code>.
-   * @param name Property name
-   * @param value Time duration
-   * @param unit Unit of time
-   */
-  public void setTimeDuration(String name, long value, TimeUnit unit) {
-    set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
-  }
-
-  /**
-   * Return time duration in the given time unit. Valid units are encoded in
-   * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
-   * (ms), seconds (s), minutes (m), hours (h), and days (d).
-   * @param name Property name
-   * @param defaultValue Value returned if no mapping exists.
-   * @param unit Unit to convert the stored property, if it exists.
-   * @throws NumberFormatException If the property stripped of its unit is not
-   *         a number
-   */
-  public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
-    String vStr = get(name);
-    if (null == vStr) {
-      return defaultValue;
-    }
-    vStr = vStr.trim();
-    return getTimeDurationHelper(name, vStr, unit);
-  }
-
-  private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
-    ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
-    if (null == vUnit) {
-      LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
-      vUnit = ParsedTimeDuration.unitFor(unit);
-    } else {
-      vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix()));
-    }
-    return unit.convert(Long.parseLong(vStr), vUnit.unit());
-  }
-
-  public long[] getTimeDurations(String name, TimeUnit unit) {
-    String[] strings = getTrimmedStrings(name);
-    long[] durations = new long[strings.length];
-    for (int i = 0; i < strings.length; i++) {
-      durations[i] = getTimeDurationHelper(name, strings[i], unit);
-    }
-    return durations;
-  }
-
-  /**
-   * Get the value of the <code>name</code> property as a <code>Pattern</code>.
-   * If no such property is specified, or if the specified value is not a valid
-   * <code>Pattern</code>, then <code>DefaultValue</code> is returned.
-   * Note that the returned value is NOT trimmed by this method.
-   *
-   * @param name property name
-   * @param defaultValue default value
-   * @return property value as a compiled Pattern, or defaultValue
-   */
-  public Pattern getPattern(String name, Pattern defaultValue) {
-    String valString = get(name);
-    if (null == valString || valString.isEmpty()) {
-      return defaultValue;
-    }
-    try {
-      return Pattern.compile(valString);
-    } catch (PatternSyntaxException pse) {
-      LOG.warn("Regular expression '" + valString + "' for property '" +
-               name + "' not valid. Using default", pse);
-      return defaultValue;
-    }
-  }
-
-  /**
-   * Set the given property to <code>Pattern</code>.
-   * If the pattern is passed as null, sets the empty pattern which results in
-   * further calls to getPattern(...) returning the default value.
-   *
-   * @param name property name
-   * @param pattern new value
-   */
-  public void setPattern(String name, Pattern pattern) {
-    assert pattern != null : "Pattern cannot be null";
-    set(name, pattern.pattern());
-  }
-
-  /**
-   * Gets information about why a property was set.  Typically this is the 
-   * path to the resource objects (file, URL, etc.) the property came from, but
-   * it can also indicate that it was set programatically, or because of the
-   * command line.
-   *
-   * @param name - The property name to get the source of.
-   * @return null - If the property or its source wasn't found. Otherwise, 
-   * returns a list of the sources of the resource.  The older sources are
-   * the first ones in the list.  So for example if a configuration is set from
-   * the command line, and then written out to a file that is read back in the
-   * first entry would indicate that it was set from the command line, while
-   * the second one would indicate the file that the new configuration was read
-   * in from.
-   */
-  @InterfaceStability.Unstable
-  public synchronized String[] getPropertySources(String name) {
-    if (properties == null) {
-      // If properties is null, it means a resource was newly added
-      // but the props were cleared so as to load it upon future
-      // requests. So lets force a load by asking a properties list.
-      getProps();
-    }
-    // Return a null right away if our properties still
-    // haven't loaded or the resource mapping isn't defined
-    if (properties == null || updatingResource == null) {
-      return null;
-    } else {
-      String[] source = updatingResource.get(name);
-      if(source == null) {
-        return null;
-      } else {
-        return Arrays.copyOf(source, source.length);
-      }
-    }
-  }
-
-  /**
-   * A class that represents a set of positive integer ranges. It parses
-   * strings of the form: "2-3,5,7-" where ranges are separated by comma and
-   * the lower/upper bounds are separated by dash. Either the lower or upper
-   * bound may be omitted meaning all values up to or over. So the string
-   * above means 2, 3, 5, and 7, 8, 9, ...
-   */
-  public static class IntegerRanges implements Iterable<Integer>{
-    private static class Range {
-      int start;
-      int end;
-    }
-
-    private static class RangeNumberIterator implements Iterator<Integer> {
-      Iterator<Range> internal;
-      int at;
-      int end;
-
-      public RangeNumberIterator(List<Range> ranges) {
-        if (ranges != null) {
-          internal = ranges.iterator();
-        }
-        at = -1;
-        end = -2;
-      }
-
-      @Override
-      public boolean hasNext() {
-        if (at <= end) {
-          return true;
-        } else if (internal != null){
-          return internal.hasNext();
-        }
-        return false;
-      }
-
-      @Override
-      public Integer next() {
-        if (at <= end) {
-          at++;
-          return at - 1;
-        } else if (internal != null){
-          Range found = internal.next();
-          if (found != null) {
-            at = found.start;
-            end = found.end;
-            at++;
-            return at - 1;
-          }
-        }
-        return null;
-      }
-
-      @Override
-      public void remove() {
-        throw new UnsupportedOperationException();
-      }
-    };
-
-    List<Range> ranges = new ArrayList<Range>();
-
-    public IntegerRanges() {
-    }
-
-    public IntegerRanges(String newValue) {
-      StringTokenizer itr = new StringTokenizer(newValue, ",");
-      while (itr.hasMoreTokens()) {
-        String rng = itr.nextToken().trim();
-        String[] parts = rng.split("-", 3);
-        if (parts.length < 1 || parts.length > 2) {
-          throw new IllegalArgumentException("integer range badly formed: " +
-                                             rng);
-        }
-        Range r = new Range();
-        r.start = convertToInt(parts[0], 0);
-        if (parts.length == 2) {
-          r.end = convertToInt(parts[1], Integer.MAX_VALUE);
-        } else {
-          r.end = r.start;
-        }
-        if (r.start > r.end) {
-          throw new IllegalArgumentException("IntegerRange from " + r.start +
-                                             " to " + r.end + " is invalid");
-        }
-        ranges.add(r);
-      }
-    }
-
-    /**
-     * Convert a string to an int treating empty strings as the default value.
-     * @param value the string value
-     * @param defaultValue the value for if the string is empty
-     * @return the desired integer
-     */
-    private static int convertToInt(String value, int defaultValue) {
-      String trim = value.trim();
-      if (trim.length() == 0) {
-        return defaultValue;
-      }
-      return Integer.parseInt(trim);
-    }
-
-    /**
-     * Is the given value in the set of ranges
-     * @param value the value to check
-     * @return is the value in the ranges?
-     */
-    public boolean isIncluded(int value) {
-      for(Range r: ranges) {
-        if (r.start <= value && value <= r.end) {
-          return true;
-        }
-      }
-      return false;
-    }
-
-    /**
-     * @return true if there are no values in this range, else false.
-     */
-    public boolean isEmpty() {
-      return ranges == null || ranges.isEmpty();
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder result = new StringBuilder();
-      boolean first = true;
-      for(Range r: ranges) {
-        if (first) {
-          first = false;
-        } else {
-          result.append(',');
-        }
-        result.append(r.start);
-        result.append('-');
-        result.append(r.end);
-      }
-      return result.toString();
-    }
-
-    @Override
-    public Iterator<Integer> iterator() {
-      return new RangeNumberIterator(ranges);
-    }
-
-  }
-
-  /**
-   * Parse the given attribute as a set of integer ranges
-   * @param name the attribute name
-   * @param defaultValue the default value if it is not set
-   * @return a new set of ranges from the configured value
-   */
-  public IntegerRanges getRange(String name, String defaultValue) {
-    return new IntegerRanges(get(name, defaultValue));
-  }
-
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * a collection of <code>String</code>s.  
-   * If no such property is specified then empty collection is returned.
-   * <p>
-   * This is an optimized version of {@link #getStrings(String)}
-   * 
-   * @param name property name.
-   * @return property value as a collection of <code>String</code>s. 
-   */
-  public Collection<String> getStringCollection(String name) {
-    String valueString = get(name);
-    return StringUtils.getStringCollection(valueString);
-  }
-
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * an array of <code>String</code>s.  
-   * If no such property is specified then <code>null</code> is returned.
-   * 
-   * @param name property name.
-   * @return property value as an array of <code>String</code>s, 
-   *         or <code>null</code>. 
-   */
-  public String[] getStrings(String name) {
-    String valueString = get(name);
-    return StringUtils.getStrings(valueString);
-  }
-
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * an array of <code>String</code>s.  
-   * If no such property is specified then default value is returned.
-   * 
-   * @param name property name.
-   * @param defaultValue The default value
-   * @return property value as an array of <code>String</code>s, 
-   *         or default value. 
-   */
-  public String[] getStrings(String name, String... defaultValue) {
-    String valueString = get(name);
-    if (valueString == null) {
-      return defaultValue;
-    } else {
-      return StringUtils.getStrings(valueString);
-    }
-  }
-  
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * a collection of <code>String</code>s, trimmed of the leading and trailing whitespace.  
-   * If no such property is specified then empty <code>Collection</code> is returned.
-   *
-   * @param name property name.
-   * @return property value as a collection of <code>String</code>s, or empty <code>Collection</code> 
-   */
-  public Collection<String> getTrimmedStringCollection(String name) {
-    String valueString = get(name);
-    if (null == valueString) {
-      Collection<String> empty = new ArrayList<String>();
-      return empty;
-    }
-    return StringUtils.getTrimmedStringCollection(valueString);
-  }
-  
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
-   * If no such property is specified then an empty array is returned.
-   * 
-   * @param name property name.
-   * @return property value as an array of trimmed <code>String</code>s, 
-   *         or empty array. 
-   */
-  public String[] getTrimmedStrings(String name) {
-    String valueString = get(name);
-    return StringUtils.getTrimmedStrings(valueString);
-  }
-
-  /** 
-   * Get the comma delimited values of the <code>name</code> property as 
-   * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
-   * If no such property is specified then default value is returned.
-   * 
-   * @param name property name.
-   * @param defaultValue The default value
-   * @return property value as an array of trimmed <code>String</code>s, 
-   *         or default value. 
-   */
-  public String[] getTrimmedStrings(String name, String... defaultValue) {
-    String valueString = get(name);
-    if (null == valueString) {
-      return defaultValue;
-    } else {
-      return StringUtils.getTrimmedStrings(valueString);
-    }
-  }
-
-  /** 
-   * Set the array of string values for the <code>name</code> property as 
-   * as comma delimited values.  
-   * 
-   * @param name property name.
-   * @param values The values
-   */
-  public void setStrings(String name, String... values) {
-    set(name, StringUtils.arrayToString(values));
-  }
-
-  /**
-   * Get the value for a known password configuration element.
-   * In order to enable the elimination of clear text passwords in config,
-   * this method attempts to resolve the property name as an alias through
-   * the CredentialProvider API and conditionally falls back to config.
-   * @param name property name
-   * @return password
-   */
-  public char[] getPassword(String name) throws IOException {
-    char[] pass = null;
-
-    pass = getPasswordFromCredentialProviders(name);
-
-    if (pass == null) {
-      pass = getPasswordFromConfig(name);
-    }
-
-    return pass;
-  }
-
-  /**
-   * Try and resolve the provided element name as a credential provider
-   * alias.
-   * @param name alias of the provisioned credential
-   * @return password or null if not found
-   * @throws IOException
-   */
-  protected char[] getPasswordFromCredentialProviders(String name)
-      throws IOException {
-    char[] pass = null;
-    try {
-      List<CredentialProvider> providers =
-          CredentialProviderFactory.getProviders(this);
-
-      if (providers != null) {
-        for (CredentialProvider provider : providers) {
-          try {
-            CredentialEntry entry = provider.getCredentialEntry(name);
-            if (entry != null) {
-              pass = entry.getCredential();
-              break;
-            }
-          }
-          catch (IOException ioe) {
-            throw new IOException("Can't get key " + name + " from key provider" +
-            		"of type: " + provider.getClass().getName() + ".", ioe);
-          }
-        }
-      }
-    }
-    catch (IOException ioe) {
-      throw new IOException("Configuration problem with provider path.", ioe);
-    }
-
-    return pass;
-  }
-
-  /**
-   * Fallback to clear text passwords in configuration.
-   * @param name
-   * @return clear text password or null
-   */
-  protected char[] getPasswordFromConfig(String name) {
-    char[] pass = null;
-    if (getBoolean(CredentialProvider.CLEAR_TEXT_FALLBACK, true)) {
-      String passStr = get(name);
-      if (passStr != null) {
-        pass = passStr.toCharArray();
-      }
-    }
-    return pass;
-  }
-
-  /**
-   * Get the socket address for <code>hostProperty</code> as a
-   * <code>InetSocketAddress</code>. If <code>hostProperty</code> is
-   * <code>null</code>, <code>addressProperty</code> will be used. This
-   * is useful for cases where we want to differentiate between host
-   * bind address and address clients should use to establish connection.
-   *
-   * @param hostProperty bind host property name.
-   * @param addressProperty address property name.
-   * @param defaultAddressValue the default value
-   * @param defaultPort the default port
-   * @return InetSocketAddress
-   */
-  public InetSocketAddress getSocketAddr(
-      String hostProperty,
-      String addressProperty,
-      String defaultAddressValue,
-      int defaultPort) {
-
-    InetSocketAddress bindAddr = getSocketAddr(
-      addressProperty, defaultAddressValue, defaultPort);
-
-    final String host = get(hostProperty);
-
-    if (host == null || host.isEmpty()) {
-      return bindAddr;
-    }
-
-    return NetUtils.createSocketAddr(
-        host, bindAddr.getPort(), hostProperty);
-  }
-
-  /**
-   * Get the socket address for <code>name</code> property as a
-   * <code>InetSocketAddress</code>.
-   * @param name property name.
-   * @param defaultAddress the default value
-   * @param defaultPort the default port
-   * @return InetSocketAddress
-   */
-  public InetSocketAddress getSocketAddr(
-      String name, String defaultAddress, int defaultPort) {
-    final String address = getTrimmed(name, defaultAddress);
-    return NetUtils.createSocketAddr(address, defaultPort, name);
-  }
-
-  /**
-   * Set the socket address for the <code>name</code> property as
-   * a <code>host:port</code>.
-   */
-  public void setSocketAddr(String name, InetSocketAddress addr) {
-    set(name, NetUtils.getHostPortString(addr));
-  }
-
-  /**
-   * Set the socket address a client can use to connect for the
-   * <code>name</code> property as a <code>host:port</code>.  The wildcard
-   * address is replaced with the local host's address. If the host and address
-   * properties are configured the host component of the address will be combined
-   * with the port component of the addr to generate the address.  This is to allow
-   * optional control over which host name is used in multi-home bind-host
-   * cases where a host can have multiple names
-   * @param hostProperty the bind-host configuration name
-   * @param addressProperty the service address configuration name
-   * @param defaultAddressValue the service default address configuration value
-   * @param addr InetSocketAddress of the service listener
-   * @return InetSocketAddress for clients to connect
-   */
-  public InetSocketAddress updateConnectAddr(
-      String hostProperty,
-      String addressProperty,
-      String defaultAddressValue,
-      InetSocketAddress addr) {
-
-    final String host = get(hostProperty);
-    final String connectHostPort = getTrimmed(addressProperty, defaultAddressValue);
-
-    if (host == null || host.isEmpty() || connectHostPort == null || connectHostPort.isEmpty()) {
-      //not our case, fall back to original logic
-      return updateConnectAddr(addressProperty, addr);
-    }
-
-    final String connectHost = connectHostPort.split(":")[0];
-    // Create connect address using client address hostname and server port.
-    return updateConnectAddr(addressProperty, NetUtils.createSocketAddrForHost(
-        connectHost, addr.getPort()));
-  }
-  
-  /**
-   * Set the socket address a client can use to connect for the
-   * <code>name</code> property as a <code>host:port</code>.  The wildcard
-   * address is replaced with the local host's address.
-   * @param name property name.
-   * @param addr InetSocketAddress of a listener to store in the given property
-   * @return InetSocketAddress for clients to connect
-   */
-  public InetSocketAddress updateConnectAddr(String name,
-                                             InetSocketAddress addr) {
-    final InetSocketAddress connectAddr = NetUtils.getConnectAddress(addr);
-    setSocketAddr(name, connectAddr);
-    return connectAddr;
-  }
-  
-  /**
-   * Load a class by name.
-   * 
-   * @param name the class name.
-   * @return the class object.
-   * @throws ClassNotFoundException if the class is not found.
-   */
-  public Class<?> getClassByName(String name) throws ClassNotFoundException {
-    Class<?> ret = getClassByNameOrNull(name);
-    if (ret == null) {
-      throw new ClassNotFoundException("Class " + name + " not found");
-    }
-    return ret;
-  }
-  
-  /**
-   * Load a class by name, returning null rather than throwing an exception
-   * if it couldn't be loaded. This is to avoid the overhead of creating
-   * an exception.
-   * 
-   * @param name the class name
-   * @return the class object, or null if it could not be found.
-   */
-  public Class<?> getClassByNameOrNull(String name) {
-    Map<String, WeakReference<Class<?>>> map;
-    
-    synchronized (CACHE_CLASSES) {
-      map = CACHE_CLASSES.get(classLoader);
-      if (map == null) {
-        map = Collections.synchronizedMap(
-          new WeakHashMap<String, WeakReference<Class<?>>>());
-        CACHE_CLASSES.put(classLoader, map);
-      }
-    }
-
-    Class<?> clazz = null;
-    WeakReference<Class<?>> ref = map.get(name); 
-    if (ref != null) {
-       clazz = ref.get();
-    }
-     
-    if (clazz == null) {
-      try {
-        clazz = Class.forName(name, true, classLoader);
-      } catch (ClassNotFoundException e) {
-        // Leave a marker that the class isn't found
-        map.put(name, new WeakReference<Class<?>>(NEGATIVE_CACHE_SENTINEL));
-        return null;
-      }
-      // two putters can race here, but they'll put the same class
-      map.put(name, new WeakReference<Class<?>>(clazz));
-      return clazz;
-    } else if (clazz == NEGATIVE_CACHE_SENTINEL) {
-      return null; // not found
-    } else {
-      // cache hit
-      return clazz;
-    }
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property
-   * as an array of <code>Class</code>.
-   * The value of the property specifies a list of comma separated class names.  
-   * If no such property is specified, then <code>defaultValue</code> is 
-   * returned.
-   * 
-   * @param name the property name.
-   * @param defaultValue default value.
-   * @return property value as a <code>Class[]</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public Class<?>[] getClasses(String name, Class<?> ... defaultValue) {
-    String[] classnames = getTrimmedStrings(name);
-    if (classnames == null)
-      return defaultValue;
-    try {
-      Class<?>[] classes = new Class<?>[classnames.length];
-      for(int i = 0; i < classnames.length; i++) {
-        classes[i] = getClassByName(classnames[i]);
-      }
-      return classes;
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property as a <code>Class</code>.  
-   * If no such property is specified, then <code>defaultValue</code> is 
-   * returned.
-   * 
-   * @param name the class name.
-   * @param defaultValue default value.
-   * @return property value as a <code>Class</code>, 
-   *         or <code>defaultValue</code>. 
-   */
-  public Class<?> getClass(String name, Class<?> defaultValue) {
-    String valueString = getTrimmed(name);
-    if (valueString == null)
-      return defaultValue;
-    try {
-      return getClassByName(valueString);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /** 
-   * Get the value of the <code>name</code> property as a <code>Class</code>
-   * implementing the interface specified by <code>xface</code>.
-   *   
-   * If no such property is specified, then <code>defaultValue</code> is 
-   * returned.
-   * 
-   * An exception is thrown if the returned class does not implement the named
-   * interface. 
-   * 
-   * @param name the class name.
-   * @param defaultValue default value.
-   * @param xface the interface implemented by the named class.
-   * @return property value as a <code>Class</code>, 
-   *         or <code>defaultValue</code>.
-   */
-  public <U> Class<? extends U> getClass(String name, 
-                                         Class<? extends U> defaultValue, 
-                                         Class<U> xface) {
-    try {
-      Class<?> theClass = getClass(name, defaultValue);
-      if (theClass != null && !xface.isAssignableFrom(theClass))
-        throw new RuntimeException(theClass+" not "+xface.getName());
-      else if (theClass != null)
-        return theClass.asSubclass(xface);
-      else
-        return null;
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  /**
-   * Get the value of the <code>name</code> property as a <code>List</code>
-   * of objects implementing the interface specified by <code>xface</code>.
-   * 
-   * An exception is thrown if any of the classes does not exist, or if it does
-   * not implement the named interface.
-   * 
-   * @param name the property name.
-   * @param xface the interface implemented by the classes named by
-   *        <code>name</code>.
-   * @return a <code>List</code> of objects implementing <code>xface</code>.
-   */
-  @SuppressWarnings("unchecked")
-  public <U> List<U> getInstances(String name, Class<U> xface) {
-    List<U> ret = new ArrayList<U>();
-    Class<?>[] classes = getClasses(name);
-    for (Class<?> cl: classes) {
-      if (!xface.isAssignableFrom(cl)) {
-        throw new RuntimeException(cl + " does not implement " + xface);
-      }
-      ret.add((U)ReflectionUtils.newInstance(cl, this));
-    }
-    return ret;
-  }
-
-  /** 
-   * Set the value of the <code>name</code> property to the name of a 
-   * <code>theClass</code> implementing the given interface <code>xface</code>.
-   * 
-   * An exception is thrown if <code>theClass</code> does not implement the 
-   * interface <code>xface</code>. 
-   * 
-   * @param name property name.
-   * @param theClass property value.
-   * @param xface the interface implemented by the named class.
-   */
-  public void setClass(String name, Class<?> theClass, Class<?> xface) {
-    if (!xface.isAssignableFrom(theClass))
-      throw new RuntimeException(theClass+" not "+xface.getName());
-    set(name, theClass.getName());
-  }
-
-  /** 
-   * Get a local file under a directory named by <i>dirsProp</i> with
-   * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
-   * then one is chosen based on <i>path</i>'s hash code.  If the selected
-   * directory does not exist, an attempt is made to create it.
-   * 
-   * @param dirsProp directory in which to locate the file.
-   * @param path file-path.
-   * @return local file under the directory with the given path.
-   */
-  public Path getLocalPath(String dirsProp, String path)
-    throws IOException {
-    String[] dirs = getTrimmedStrings(dirsProp);
-    int hashCode = path.hashCode();
-    FileSystem fs = FileSystem.getLocal(this);
-    for (int i = 0; i < dirs.length; i++) {  // try each local dir
-      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-      Path file = new Path(dirs[index], path);
-      Path dir = file.getParent();
-      if (fs.mkdirs(dir) || fs.exists(dir)) {
-        return file;
-      }
-    }
-    LOG.warn("Could not make " + path + 
-             " in local directories from " + dirsProp);
-    for(int i=0; i < dirs.length; i++) {
-      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-      LOG.warn(dirsProp + "[" + index + "]=" + dirs[index]);
-    }
-    throw new IOException("No valid local directories in property: "+dirsProp);
-  }
-
-  /** 
-   * Get a local file name under a directory named in <i>dirsProp</i> with
-   * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
-   * then one is chosen based on <i>path</i>'s hash code.  If the selected
-   * directory does not exist, an attempt is made to create it.
-   * 
-   * @param dirsProp directory in which to locate the file.
-   * @param path file-path.
-   * @return local file under the directory with the given path.
-   */
-  public File getFile(String dirsProp, String path)
-    throws IOException {
-    String[] dirs = getTrimmedStrings(dirsProp);
-    int hashCode = path.hashCode();
-    for (int i = 0; i < dirs.length; i++) {  // try each local dir
-      int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-      File file = new File(dirs[index], path);
-      File dir = file.getParentFile();
-      if (dir.exists() || dir.mkdirs()) {
-        return file;
-      }
-    }
-    throw new IOException("No valid local directories in property: "+dirsProp);
-  }
-
-  /** 
-   * Get the {@link URL} for the named resource.
-   * 
-   * @param name resource name.
-   * @return the url for the named resource.
-   */
-  public URL getResource(String name) {
-    return classLoader.getResource(name);
-  }
-  
-  /** 
-   * Get an input stream attached to the configuration resource with the
-   * given <code>name</code>.
-   * 
-   * @param name configuration resource name.
-   * @return an input stream attached to the resource.
-   */
-  public InputStream getConfResourceAsInputStream(String name) {
-    try {
-      URL url= getResource(name);
-
-      if (url == null) {
-        LOG.info(name + " not found");
-        return null;
-      } else {
-        LOG.info("found resource " + name + " at " + url);
-      }
-
-      return url.openStream();
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  /** 
-   * Get a {@link Reader} attached to the configuration resource with the
-   * given <code>name</code>.
-   * 
-   * @param name configuration resource name.
-   * @return a reader attached to the resource.
-   */
-  public Reader getConfResourceAsReader(String name) {
-    try {
-      URL url= getResource(name);
-
-      if (url == null) {
-        LOG.info(name + " not found");
-        return null;
-      } else {
-        LOG.info("found resource " + name + " at " + url);
-      }
-
-      return new InputStreamReader(url.openStream(), Charsets.UTF_8);
-    } catch (Exception e) {
-      return null;
-    }
-  }
-
-  /**
-   * Get the set of parameters marked final.
-   *
-   * @return final parameter set.
-   */
-  public Set<String> getFinalParameters() {
-    Set<String> setFinalParams = Collections.newSetFromMap(
-        new ConcurrentHashMap<String, Boolean>());
-    setFinalParams.addAll(finalParameters);
-    return setFinalParams;
-  }
-
-  protected synchronized Properties getProps() {
-    if (properties == null) {
-      properties = new Properties();
-      Map<String, String[]> backup =
-          new ConcurrentHashMap<String, String[]>(updatingResource);
-      loadResources(properties, resources, quietmode);
-
-      if (overlay != null) {
-        properties.putAll(overlay);
-        for (Map.Entry<Object,Object> item: overlay.entrySet()) {
-          String key = (String)item.getKey();
-          String[] source = backup.get(key);
-          if(source != null) {
-            updatingResource.put(key, source);
-          }
-        }
-      }
-    }
-    return properties;
-  }
-
-  /**
-   * Return the number of keys in the configuration.
-   *
-   * @return number of keys in the configuration.
-   */
-  public int size() {
-    return getProps().size();
-  }
-
-  /**
-   * Clears all keys from the configuration.
-   */
-  public void clear() {
-    getProps().clear();
-    getOverlay().clear();
-  }
-
-  /**
-   * Get an {@link Iterator} to go through the list of <code>String</code> 
-   * key-value pairs in the configuration.
-   * 
-   * @return an iterator over the entries.
-   */
-  @Override
-  public Iterator<Map.Entry<String, String>> iterator() {
-    // Get a copy of just the string to string pairs. After the old object
-    // methods that allow non-strings to be put into configurations are removed,
-    // we could replace properties with a Map<String,String> and get rid of this
-    // code.
-    Map<String,String> result = new HashMap<String,String>();
-    for(Map.Entry<Object,Object> item: getProps().entrySet()) {
-      if (item.getKey() instanceof String &&
-          item.getValue() instanceof String) {
-          result.put((String) item.getKey(), (String) item.getValue());
-      }
-    }
-    return result.entrySet().iterator();
-  }
-
-  /**
-   * Constructs a mapping of configuration and includes all properties that
-   * start with the specified configuration prefix.  Property names in the
-   * mapping are trimmed to remove the configuration prefix.
-   *
-   * @param confPrefix configuration prefix
-   * @return mapping of configuration properties with prefix stripped
-   */
-  public Map<String, String> getPropsWithPrefix(String confPrefix) {
-    Map<String, String> configMap = new HashMap<>();
-    for (Map.Entry<String, String> entry : this) {
-      String name = entry.getKey();
-      if (name.startsWith(confPrefix)) {
-        String value = this.get(name);
-        name = name.substring(confPrefix.length());
-        configMap.put(name, value);
-      }
-    }
-    return configMap;
-  }
-
-  private Document parse(DocumentBuilder builder, URL url)
-      throws IOException, SAXException {
-    if (!quietmode) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("parsing URL " + url);
-      }
-    }
-    if (url == null) {
-      return null;
-    }
-
-    URLConnection connection = url.openConnection();
-    if (connection instanceof JarURLConnection) {
-      // Disable caching for JarURLConnection to avoid sharing JarFile
-      // with other users.
-      connection.setUseCaches(false);
-    }
-    return parse(builder, connection.getInputStream(), url.toString());
-  }
-
-  private Document parse(DocumentBuilder builder, InputStream is,
-      String systemId) throws IOException, SAXException {
-    if (!quietmode) {
-      LOG.debug("parsing input stream " + is);
-    }
-    if (is == null) {
-      return null;
-    }
-    try {
-      return (systemId == null) ? builder.parse(is) : builder.parse(is,
-          systemId);
-    } finally {
-      is.close();
-    }
-  }
-
-  private void loadResources(Properties properties,
-                             ArrayList<Resource> resources,
-                             boolean quiet) {
-    if(loadDefaults) {
-      for (String resource : defaultResources) {
-        loadResource(properties, new Resource(resource), quiet);
-      }
-    
-      //support the hadoop-site.xml as a deprecated case
-      if(getResource("hadoop-site.xml")!=null) {
-        loadResource(properties, new Resource("hadoop-site.xml"), quiet);
-      }
-    }
-    
-    for (int i = 0; i < resources.size(); i++) {
-      Resource ret = loadResource(properties, resources.get(i), quiet);
-      if (ret != null) {
-        resources.set(i, ret);
-      }
-    }
-  }
-  
-  private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
-    String name = UNKNOWN_RESOURCE;
-    try {
-      Object resource = wrapper.getResource();
-      name = wrapper.getName();
-      
-      DocumentBuilderFactory docBuilderFactory 
-        = DocumentBuilderFactory.newInstance();
-      //ignore all comments inside the xml file
-      docBuilderFactory.setIgnoringComments(true);
-
-      //allow includes in the xml file
-      docBuilderFactory.setNamespaceAware(true);
-      try {
-          docBuilderFactory.setXIncludeAware(true);
-      } catch (UnsupportedOperationException e) {
-        LOG.error("Failed to set setXIncludeAware(true) for parser "
-                + docBuilderFactory
-                + ":" + e,
-                e);
-      }
-      DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-      Document doc = null;
-      Element root = null;
-      boolean returnCachedProperties = false;
-      
-      if (resource instanceof URL) {                  // an URL resource
-        doc = parse(builder, (URL)resource);
-      } else if (resource instanceof String) {        // a CLASSPATH resource
-        URL url = getResource((String)resource);
-        doc = parse(builder, url);
-      } else if (resource instanceof Path) {          // a file resource
-        // Can't use FileSystem API or we get an infinite loop
-        // since FileSystem uses Configuration API.  Use java.io.File instead.
-        File file = new File(((Path)resource).toUri().getPath())
-          .getAbsoluteFile();
-        if (file.exists()) {
-          if (!quiet) {
-            LOG.debug("parsing File " + file);
-          }
-          doc = parse(builder, new BufferedInputStream(
-              new FileInputStream(file)), ((Path)resource).toString());
-        }
-      } else if (resource instanceof InputStream) {
-        doc = parse(builder, (InputStream) resource, null);
-        returnCachedProperties = true;
-      } else if (resource instanceof Properties) {
-        overlay(properties, (Properties)resource);
-      } else if (resource instanceof Element) {
-        root = (Element)resource;
-      }
-
-      if (root == null) {
-        if (doc == null) {
-          if (quiet) {
-            return null;
-          }
-          throw new RuntimeException(resource + " not found");
-        }
-        root = doc.getDocumentElement();
-      }
-      Properties toAddTo = properties;
-      if(returnCachedProperties) {
-        toAddTo = new Properties();
-      }
-      if (!"configuration".equals(root.getTagName()))
-        LOG.fatal("bad conf file: top-level element not <configuration>");
-      NodeList props = root.getChildNodes();
-      DeprecationContext deprecations = deprecationContext.get();
-      for (int i = 0; i < props.getLength(); i++) {
-        Node propNode = props.item(i);
-        if (!(propNode instanceof Element))
-          continue;
-        Element prop = (Element)propNode;
-        if ("configuration".equals(prop.getTagName())) {
-          loadResource(toAddTo, new Resource(prop, name), quiet);
-          continue;
-        }
-        if (!"property".equals(prop.getTagName()))
-          LOG.warn("bad conf file: element not <property>");
-        NodeList fields = prop.getChildNodes();
-        String attr = null;
-        String value = null;
-        boolean finalParameter = false;
-        LinkedList<String> source = new LinkedList<String>();
-        for (int j = 0; j < fields.getLength(); j++) {
-          Node fieldNode = fields.item(j);
-          if (!(fieldNode instanceof Element))
-            continue;
-          Element field = (Element)fieldNode;
-          if ("name".equals(field.getTagName()) && field.hasChildNodes())
-            attr = StringInterner.weakIntern(
-                ((Text)field.getFirstChild()).getData().trim());
-          if ("value".equals(field.getTagName()) && field.hasChildNodes())
-            value = StringInterner.weakIntern(
-                ((Text)field.getFirstChild()).getData());
-          if ("final".equals(field.getTagName()) && field.hasChildNodes())
-            finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
-          if ("source".equals(field.getTagName()) && field.hasChildNodes())
-            source.add(StringInterner.weakIntern(
-                ((Text)field.getFirstChild()).getData()));
-        }
-        source.add(name);
-        
-        // Ignore this parameter if it has already been marked as 'final'
-        if (attr != null) {
-          if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
-            DeprecatedKeyInfo keyInfo =
-                deprecations.getDeprecatedKeyMap().get(attr);
-            keyInfo.clearAccessed();
-            for (String key:keyInfo.newKeys) {
-              // update new keys with deprecated key's value 
-              loadProperty(toAddTo, name, key, value, finalParameter, 
-                  source.toArray(new String[source.size()]));
-            }
-          }
-          else {
-            loadProperty(toAddTo, name, attr, value, finalParameter, 
-                source.toArray(new String[source.size()]));
-          }
-        }
-      }
-      
-      if (returnCachedProperties) {
-        overlay(properties, toAddTo);
-        return new Resource(toAddTo, name);
-      }
-      return null;
-    } catch (IOException e) {
-      LOG.fatal("error parsing conf " + name, e);
-      throw new RuntimeException(e);
-    } catch (DOMException e) {
-      LOG.fatal("error parsing conf " + name, e);
-      throw new RuntimeException(e);
-    } catch (SAXException e) {
-      LOG.fatal("error parsing conf " + name, e);
-      throw new RuntimeException(e);
-    } catch (ParserConfigurationException e) {
-      LOG.fatal("error parsing conf " + name , e);
-      throw new RuntimeException(e);
-    }
-  }
-
-  private void overlay(Properties to, Properties from) {
-    for (Entry<Object, Object> entry: from.entrySet()) {
-      to.put(entry.getKey(), entry.getValue());
-    }
-  }
-
-  private void loadProperty(Properties properties, String name, String attr,
-      String value, boolean finalParameter, String[] source) {
-    if (value != null || allowNullValueProperties) {
-      if (value == null) {
-        value = DEFAULT_STRING_CHECK;
-      }
-      if (!finalParameters.contains(attr)) {
-        properties.setProperty(attr, value);
-        if(source != null) {
-          updatingResource.put(attr, source);
-        }
-      } else if (!value.equals(properties.getProperty(attr))) {
-        LOG.warn(name+":an attempt to override final parameter: "+attr
-            +";  Ignoring.");
-      }
-    }
-    if (finalParameter && attr != null) {
-      finalParameters.add(attr);
-    }
-  }
-
-  /** 
-   * Write out the non-default properties in this configuration to the given
-   * {@link OutputStream} using UTF-8 encoding.
-   * 
-   * @param out the output stream to write to.
-   */
-  public void writeXml(OutputStream out) throws IOException {
-    writeXml(new OutputStreamWriter(out, "UTF-8"));
-  }
-
-  /** 
-   * Write out the non-default properties in this configuration to the given
-   * {@link Writer}.
-   * 
-   * @param out the writer to write to.
-   */
-  public void writeXml(Writer out) throws IOException {
-    Document doc = asXmlDocument();
-
-    try {
-      DOMSource source = new DOMSource(doc);
-      StreamResult result = new StreamResult(out);
-      TransformerFactory transFactory = TransformerFactory.newInstance();
-      Transformer transformer = transFactory.newTransformer();
-
-      // Important to not hold Configuration log while writing result, since
-      // 'out' may be an HDFS stream which needs to lock this configuration
-      // from another thread.
-      transformer.transform(source, result);
-    } catch (TransformerException te) {
-      throw new IOException(te);
-    }
-  }
-
-  /**
-   * Return the XML DOM corresponding to this Configuration.
-   */
-  private synchronized Document asXmlDocument() throws IOException {
-    Document doc;
-    try {
-      doc =
-        DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
-    } catch (ParserConfigurationException pe) {
-      throw new IOException(pe);
-    }
-    Element conf = doc.createElement("configuration");
-    doc.appendChild(conf);
-    conf.appendChild(doc.createTextNode("\n"));
-    handleDeprecation(); //ensure properties is set and deprecation is handled
-    for (Enumeration<Object> e = properties.keys(); e.hasMoreElements();) {
-      String name = (String)e.nextElement();
-      Object object = properties.get(name);
-      String value = null;
-      if (object instanceof String) {
-        value = (String) object;
-      }else {
-        continue;
-      }
-      Element propNode = doc.createElement("property");
-      conf.appendChild(propNode);
-
-      Element nameNode = doc.createElement("name");
-      nameNode.appendChild(doc.createTextNode(name));
-      propNode.appendChild(nameNode);
-
-      Element valueNode = doc.createElement("value");
-      valueNode.appendChild(doc.createTextNode(value));
-      propNode.appendChild(valueNode);
-
-      if (updatingResource != null) {
-        String[] sources = updatingResource.get(name);
-        if(sources != null) {
-          for(String s : sources) {
-            Element sourceNode = doc.createElement("source");
-            sourceNode.appendChild(doc.createTextNode(s));
-            propNode.appendChild(sourceNode);
-          }
-        }
-      }
-      
-      conf.appendChild(doc.createTextNode("\n"));
-    }
-    return doc;
-  }
-
-  /**
-   *  Writes out all the parameters and their properties (final and resource) to
-   *  the given {@link Writer}
-   *  The format of the output would be 
-   *  { "properties" : [ {key1,value1,key1.isFinal,key1.resource}, {key2,value2,
-   *  key2.isFinal,key2.resource}... ] } 
-   *  It does not output the parameters of the configuration object which is 
-   *  loaded from an input stream.
-   * @param out the Writer to write to
-   * @throws IOException
-   */
-  public static void dumpConfiguration(Configuration config,
-      Writer out) throws IOException {
-    JsonFactory dumpFactory = new JsonFactory();
-    JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
-    dumpGenerator.writeStartObject();
-    dumpGenerator.writeFieldName("properties");
-    dumpGenerator.writeStartArray();
-    dumpGenerator.flush();
-    synchronized (config) {
-      for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
-        dumpGenerator.writeStartObject();
-        dumpGenerator.writeStringField("key", (String) item.getKey());
-        dumpGenerator.writeStringField("value", 
-                                       config.get((String) item.getKey()));
-        dumpGenerator.writeBooleanField("isFinal",
-                                        config.finalParameters.contains(item.getKey()));
-        String[] resources = config.updatingResource.get(item.getKey());
-        String resource = UNKNOWN_RESOURCE;
-        if(resources != null && resources.length > 0) {
-          resource = resources[0];
-        }
-        dumpGenerator.writeStringField("resource", resource);
-        dumpGenerator.writeEndObject();
-      }
-    }
-    dumpGenerator.writeEndArray();
-    dumpGenerator.writeEndObject();
-    dumpGenerator.flush();
-  }
-  
-  /**
-   * Get the {@link ClassLoader} for this job.
-   * 
-   * @return the correct class loader.
-   */
-  public ClassLoader getClassLoader() {
-    return classLoader;
-  }
-  
-  /**
-   * Set the class loader that will be used to load the various objects.
-   * 
-   * @param classLoader the new class loader.
-   */
-  public void setClassLoader(ClassLoader classLoader) {
-    this.classLoader = classLoader;
-  }
-  
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("Configuration: ");
-    if(loadDefaults) {
-      toString(defaultResources, sb);
-      if(resources.size()>0) {
-        sb.append(", ");
-      }
-    }
-    toString(resources, sb);
-    return sb.toString();
-  }
-  
-  private <T> void toString(List<T> resources, StringBuilder sb) {
-    ListIterator<T> i = resources.listIterator();
-    while (i.hasNext()) {
-      if (i.nextIndex() != 0) {
-        sb.append(", ");
-      }
-      sb.append(i.next());
-    }
-  }
-
-  /** 
-   * Set the quietness-mode. 
-   * 
-   * In the quiet-mode, error and informational messages might not be logged.
-   * 
-   * @param quietmode <code>true</code> to set quiet-mode on, <code>false</code>
-   *              to turn it off.
-   */
-  public synchronized void setQuietMode(boolean quietmode) {
-    this.quietmode = quietmode;
-  }
-
-  synchronized boolean getQuietMode() {
-    return this.quietmode;
-  }
-  
-  /** For debugging.  List non-default properties to the terminal and exit. */
-  public static void main(String[] args) throws Exception {
-    new Configuration().writeXml(System.out);
-  }
-
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    clear();
-    int size = WritableUtils.readVInt(in);
-    for(int i=0; i < size; ++i) {
-      String key = org.apache.hadoop.io.Text.readString(in);
-      String value = org.apache.hadoop.io.Text.readString(in);
-      set(key, value); 
-      String sources[] = WritableUtils.readCompressedStringArray(in);
-      if(sources != null) {
-        updatingResource.put(key, sources);
-      }
-    }
-  }
-
-  //@Override
-  @Override
-  public void write(DataOutput out) throws IOException {
-    Properties props = getProps();
-    WritableUtils.writeVInt(out, props.size());
-    for(Map.Entry<Object, Object> item: props.entrySet()) {
-      org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
-      org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
-      WritableUtils.writeCompressedStringArray(out, 
-          updatingResource.get(item.getKey()));
-    }
-  }
-  
-  /**
-   * get keys matching the the regex 
-   * @param regex
-   * @return Map<String,String> with matching keys
-   */
-  public Map<String,String> getValByRegex(String regex) {
-    Pattern p = Pattern.compile(regex);
-
-    Map<String,String> result = new HashMap<String,String>();
-    Matcher m;
-
-    for(Map.Entry<Object,Object> item: getProps().entrySet()) {
-      if (item.getKey() instanceof String && 
-          item.getValue() instanceof String) {
-        m = p.matcher((String)item.getKey());
-        if(m.find()) { // match
-          result.put((String) item.getKey(),
-              substituteVars(getProps().getProperty((String) item.getKey())));
-        }
-      }
-    }
-    return result;
-  }
-
-  /**
-   * A unique class which is used as a sentinel value in the caching
-   * for getClassByName. {@see Configuration#getClassByNameOrNull(String)}
-   */
-  private static abstract class NegativeCacheSentinel {}
-
-  public static void dumpDeprecatedKeys() {
-    DeprecationContext deprecations = deprecationContext.get();
-    for (Map.Entry<String, DeprecatedKeyInfo> entry :
-        deprecations.getDeprecatedKeyMap().entrySet()) {
-      StringBuilder newKeys = new StringBuilder();
-      for (String newKey : entry.getValue().newKeys) {
-        newKeys.append(newKey).append("\t");
-      }
-      System.out.println(entry.getKey() + "\t" + newKeys.toString());
-    }
-  }
-
-  /**
-   * Returns whether or not a deprecated name has been warned. If the name is not
-   * deprecated then always return false
-   */
-  public static boolean hasWarnedDeprecation(String name) {
-    DeprecationContext deprecations = deprecationContext.get();
-    if(deprecations.getDeprecatedKeyMap().containsKey(name)) {
-      if(deprecations.getDeprecatedKeyMap().get(name).accessed.get()) {
-        return true;
-      }
-    }
-    return false;
-  }
-}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
deleted file mode 100644
index 822da5b..0000000
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.util;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
-/**
- * A helper to load the native hadoop code i.e. libhadoop.so.
- * This handles the fallback to either the bundled libhadoop-Linux-i386-32.so
- * or the default java implementations where appropriate.
- *  
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class NativeCodeLoader {
-
-  private static final Log LOG =
-    LogFactory.getLog(NativeCodeLoader.class);
-  
-  private static boolean nativeCodeLoaded = false;
-  
-  static {
-    LOG.info("Skipping native-hadoop library for flink-s3-fs-hadoop's relocated Hadoop... " +
-             "using builtin-java classes where applicable");
-  }
-
-  /**
-   * Check if native-hadoop code is loaded for this platform.
-   * 
-   * @return <code>true</code> if native-hadoop is loaded, 
-   *         else <code>false</code>
-   */
-  public static boolean isNativeCodeLoaded() {
-    return nativeCodeLoaded;
-  }
-
-  /**
-   * Returns true only if this build was compiled with support for snappy.
-   */
-  public static native boolean buildSupportsSnappy();
-  
-  /**
-   * Returns true only if this build was compiled with support for openssl.
-   */
-  public static native boolean buildSupportsOpenssl();
-
-  public static native String getLibraryName();
-
-  /**
-   * Return if native hadoop libraries, if present, can be used for this job.
-   * @param conf configuration
-   * 
-   * @return <code>true</code> if native hadoop libraries, if present, can be 
-   *         used for this job; <code>false</code> otherwise.
-   */
-  public boolean getLoadNativeLibraries(Configuration conf) {
-    return conf.getBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, 
-                           CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
-  }
-  
-  /**
-   * Set if native hadoop libraries, if present, can be used for this job.
-   * 
-   * @param conf configuration
-   * @param loadNativeLibraries can native hadoop libraries be loaded
-   */
-  public void setLoadNativeLibraries(Configuration conf, 
-                                     boolean loadNativeLibraries) {
-    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
-                    loadNativeLibraries);
-  }
-
-}
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index ef478aa..0000000
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,98 +0,0 @@
-This project includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
--------------------------------------------------------------
-
-This project bundles the following dependencies under
-the Apache Software License 2.0
-
-  - commons-collections : commons-collections version 3.2.2
-  - commons-cli : commons-cli version 1.3.1
-  - commons-codec : commons-codec version 1.10
-  - commons-io : commons-io version 2.4
-  - commons-net : commons-net version 3.1
-  - commons-logging : commons-logging version 1.1.3
-  - commons-lang : commons-lang version 2.6
-  - commons-configuration : commons-configuration version 1.7
-  - commons-digester : commons-digester version 1.8.1
-  - commons-beanutils : commons-beanutils version 1.8.3
-  - org.apache.commons : commons-compress version 1.4.1
-  - org.apache.commons : commons-math3 version 3.5
-  - org.apache.httpcomponents : httpclient version 4.5.3
-  - org.apache.httpcomponents : httpcore version 4.4.6
-  - org.apache.htrace : htrace-core4 version 4.0.1-incubating
-  - org.apache.hadoop : hadoop-client version 2.8.1
-  - org.apache.hadoop : hadoop-common version 2.8.1
-  - org.apache.hadoop : hadoop-auth version 2.8.1
-  - org.apache.hadoop : hadoop-hdfs version 2.8.1
-  - org.apache.hadoop : hadoop-hdfs-client version 2.8.1
-  - org.apache.hadoop : hadoop-annotations version 2.8.1
-  - org.apache.hadoop : hadoop-aws version 2.8.1
-  - com.google.guava : guava version 11.0.2
-  - com.google.protobuf : protobuf-java version 2.5.0
-  - com.google.code.gson : gson version 2.2.4
-  - org.mortbay.jetty : jetty-sslengine version 6.1.26
-  - com.nimbusds : nimbus-jose-jwt version 3.9
-  - net.minidev : json-smart version 1.1.1
-  - joda-time : joda-time version 2.5
-  - com.squareup.okhttp : okhttp version 2.4.0
-  - com.squareup.okio : okio version 1.4.0
-  - com.amazonaws : aws-java-sdk-s3 version 1.10.6
-  - com.amazonaws : aws-java-sdk-kms version 1.10.6
-  - com.amazonaws : aws-java-sdk-core version 1.10.6
-  - org.codehaus.jackson : jackson-core-asl version 1.9.13
-  - org.codehaus.jackson : jackson-mapper-asl version 1.9.13
-  - com.fasterxml.jackson.core : jackson-core version 2.7.4
-  - com.fasterxml.jackson.core : jackson-databind version 2.7.4
-  - com.fasterxml.jackson.core : jackson-annotations version 2.7.4
-  
--------------------------------------------------------------
-
-This project bundles the following dependencies under
-the BSD License
-
-  - xmlenc : xmlenc version 0.52 - Copyright 2003-2005, Ernst de Haan <wf...@gmail.com>
-
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice, this
-   list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
-   this list of conditions and the following disclaimer in the documentation
-   and/or other materials provided with the distribution.
-
-3. Neither the name of the copyright holder nor the names of its contributors
-   may be used to endorse or promote products derived from this software
-   without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
--------------------------------------------
-Notice for  org.tukaani : xz
--------------------------------------------
-
-This Java implementation of XZ has been put into the public domain, thus you can do whatever
-you want with it. All the files in the package have been written by Lasse Collin, but some files
-are heavily based on public domain code written by Igor Pavlov.
-
-
--------------------------------------------
-Notice for  net.jcip : jcip-annotations
--------------------------------------------
-
-This project bundles net.jcip : jcip-annotations version 1.0, which
-has been released to the public domain.
\ No newline at end of file
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/core-default-shaded.xml b/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/core-default-shaded.xml
deleted file mode 100644
index c9dff76..0000000
--- a/flink-filesystems/flink-s3-fs-hadoop/src/main/resources/core-default-shaded.xml
+++ /dev/null
@@ -1,2312 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<!-- Do not modify this file directly.  Instead, copy entries that you -->
-<!-- wish to modify from this file into core-site.xml and change them -->
-<!-- there.  If core-site.xml does not already exist, create it.      -->
-
-<configuration>
-
-<!--- global properties -->
-
-<property>
-  <name>hadoop.common.configuration.version</name>
-  <value>0.23.0</value>
-  <description>version of this configuration file</description>
-</property>
-
-<property>
-  <name>hadoop.tmp.dir</name>
-  <value>/tmp/hadoop-${user.name}</value>
-  <description>A base for other temporary directories.</description>
-</property>
-
-<property>
-  <name>io.native.lib.available</name>
-  <value>true</value>
-  <description>Controls whether to use native libraries for bz2 and zlib
-    compression codecs or not. The property does not control any other native
-    libraries.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.filter.initializers</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.http.lib.StaticUserWebFilter</value>
-  <description>A comma separated list of class names. Each class in the list
-  must extend org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.http.FilterInitializer. The corresponding
-  Filter will be initialized. Then, the Filter will be applied to all user
-  facing jsp and servlet web pages.  The ordering of the list defines the
-  ordering of the filters.</description>
-</property>
-
-<!--- security properties -->
-
-<property>
-  <name>hadoop.security.authorization</name>
-  <value>false</value>
-  <description>Is service-level authorization enabled?</description>
-</property>
-
-<property>
-  <name>hadoop.security.instrumentation.requires.admin</name>
-  <value>false</value>
-  <description>
-    Indicates if administrator ACLs are required to access
-    instrumentation servlets (JMX, METRICS, CONF, STACKS).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.authentication</name>
-  <value>simple</value>
-  <description>Possible values are simple (no authentication), and kerberos
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback</value>
-  <description>
-    Class for user to group mapping (get groups for a given user) for ACL.
-    The default implementation,
-    org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback,
-    will determine if the Java Native Interface (JNI) is available. If JNI is
-    available the implementation will use the API within hadoop to resolve a
-    list of groups for a user. If JNI is not available then the shell
-    implementation, ShellBasedUnixGroupsMapping, is used.  This implementation
-    shells out to the Linux/Unix environment with the
-    <code>bash -c groups</code> command to resolve a list of groups for a user.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.interface</name>
-  <description>
-    The name of the Network Interface from which the service should determine
-    its host name for Kerberos login. e.g. eth2. In a multi-homed environment,
-    the setting can be used to affect the _HOST subsitution in the service
-    Kerberos principal. If this configuration value is not set, the service
-    will use its default hostname as returned by
-    InetAddress.getLocalHost().getCanonicalHostName().
-
-    Most clusters will not require this setting.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.nameserver</name>
-  <description>
-    The host name or IP address of the name server (DNS) which a service Node
-    should use to determine its own host name for Kerberos Login. Requires
-    hadoop.security.dns.interface.
-
-    Most clusters will not require this setting.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.log-slow-lookups.enabled</name>
-  <value>false</value>
-  <description>
-    Time name lookups (via SecurityUtil) and log them if they exceed the
-    configured threshold.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.log-slow-lookups.threshold.ms</name>
-  <value>1000</value>
-  <description>
-    If slow lookup logging is enabled, this threshold is used to decide if a
-    lookup is considered slow enough to be logged.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.secs</name>
-  <value>300</value>
-  <description>
-    This is the config controlling the validity of the entries in the cache
-    containing the user->group mapping. When this duration has expired,
-    then the implementation of the group mapping provider is invoked to get
-    the groups of the user and then cached back.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.negative-cache.secs</name>
-  <value>30</value>
-  <description>
-    Expiration time for entries in the the negative user-to-group mapping
-    caching, in seconds. This is useful when invalid users are retrying
-    frequently. It is suggested to set a small value for this expiration, since
-    a transient error in group lookup could temporarily lock out a legitimate
-    user.
-
-    Set this to zero or negative value to disable negative user-to-group caching.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.warn.after.ms</name>
-  <value>5000</value>
-  <description>
-    If looking up a single user to group takes longer than this amount of
-    milliseconds, we will log a warning message.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.background.reload</name>
-  <value>false</value>
-  <description>
-    Whether to reload expired user->group mappings using a background thread
-    pool. If set to true, a pool of
-    hadoop.security.groups.cache.background.reload.threads is created to
-    update the cache in the background.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.background.reload.threads</name>
-  <value>3</value>
-  <description>
-    Only relevant if hadoop.security.groups.cache.background.reload is true.
-    Controls the number of concurrent background user->group cache entry
-    refreshes. Pending refresh requests beyond this value are queued and
-    processed when a thread is free.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.connection.timeout.ms</name>
-  <value>60000</value>
-  <description>
-    This property is the connection timeout (in milliseconds) for LDAP
-    operations. If the LDAP provider doesn't establish a connection within the
-    specified period, it will abort the connect attempt. Non-positive value
-    means no LDAP connection timeout is specified in which case it waits for the
-    connection to establish until the underlying network times out.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.read.timeout.ms</name>
-  <value>60000</value>
-  <description>
-    This property is the read timeout (in milliseconds) for LDAP
-    operations. If the LDAP provider doesn't get a LDAP response within the
-    specified period, it will abort the read attempt. Non-positive value
-    means no read timeout is specified in which case it waits for the response
-    infinitely.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.url</name>
-  <value></value>
-  <description>
-    The URL of the LDAP server to use for resolving user groups when using
-    the LdapGroupsMapping user to group mapping.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl</name>
-  <value>false</value>
-  <description>
-    Whether or not to use SSL when connecting to the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore</name>
-  <value></value>
-  <description>
-    File path to the SSL keystore that contains the SSL certificate required
-    by the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the LDAP SSL keystore.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.user</name>
-  <value></value>
-  <description>
-    The distinguished name of the user to bind as when connecting to the LDAP
-    server. This may be left blank if the LDAP server supports anonymous binds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the bind user.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.base</name>
-  <value></value>
-  <description>
-    The search base for the LDAP connection. This is a distinguished name,
-    and will typically be the root of the LDAP directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.user</name>
-  <value>(&amp;(objectClass=user)(sAMAccountName={0}))</value>
-  <description>
-    An additional filter to use when searching for LDAP users. The default will
-    usually be appropriate for Active Directory installations. If connecting to
-    an LDAP server with a non-AD schema, this should be replaced with
-    (&amp;(objectClass=inetOrgPerson)(uid={0}). {0} is a special string used to
-    denote where the username fits into the filter.
-
-    If the LDAP server supports posixGroups, Hadoop can enable the feature by
-    setting the value of this property to "posixAccount" and the value of
-    the hadoop.security.group.mapping.ldap.search.filter.group property to
-    "posixGroup".
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.group</name>
-  <value>(objectClass=group)</value>
-  <description>
-    An additional filter to use when searching for LDAP groups. This should be
-    changed when resolving groups against a non-Active Directory installation.
-
-    See the description of hadoop.security.group.mapping.ldap.search.filter.user
-    to enable posixGroups support.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.member</name>
-  <value>member</value>
-  <description>
-    The attribute of the group object that identifies the users that are
-    members of the group. The default will usually be appropriate for
-    any LDAP installation.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.group.name</name>
-  <value>cn</value>
-  <description>
-    The attribute of the group object that identifies the group name. The
-    default will usually be appropriate for all LDAP systems.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.posix.attr.uid.name</name>
-  <value>uidNumber</value>
-  <description>
-    The attribute of posixAccount to use when groups for membership.
-    Mostly useful for schemas wherein groups have memberUids that use an
-    attribute other than uidNumber.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.posix.attr.gid.name</name>
-  <value>gidNumber</value>
-  <description>
-    The attribute of posixAccount indicating the group id.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.directory.search.timeout</name>
-  <value>10000</value>
-  <description>
-    The attribute applied to the LDAP SearchControl properties to set a
-    maximum time limit when searching and awaiting a result.
-    Set to 0 if infinite wait period is desired.
-    Default is 10 seconds. Units in milliseconds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers</name>
-  <value></value>
-  <description>
-    Comma separated of names of other providers to provide user to group
-    mapping. Used by CompositeGroupsMapping.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers.combined</name>
-  <value>true</value>
-  <description>
-    true or false to indicate whether groups from the providers are combined or
-    not. The default value is true. If true, then all the providers will be
-    tried to get groups and all the groups are combined to return as the final
-    results. Otherwise, providers are tried one by one in the configured list
-    order, and if any groups are retrieved from any provider, then the groups
-    will be returned without trying the left ones.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.service.user.name.key</name>
-  <value></value>
-  <description>
-    For those cases where the same RPC protocol is implemented by multiple
-    servers, this configuration is required for specifying the principal
-    name to use for the service when the client wishes to make an RPC call.
-  </description>
-</property>
-
-
-<property>
-    <name>hadoop.security.uid.cache.secs</name>
-    <value>14400</value>
-    <description>
-        This is the config controlling the validity of the entries in the cache
-        containing the userId to userName and groupId to groupName used by
-        NativeIO getFstat().
-    </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.protection</name>
-  <value>authentication</value>
-  <description>A comma-separated list of protection values for secured sasl
-      connections. Possible values are authentication, integrity and privacy.
-      authentication means authentication only and no integrity or privacy;
-      integrity implies authentication and integrity are enabled; and privacy
-      implies all of authentication, integrity and privacy are enabled.
-      hadoop.security.saslproperties.resolver.class can be used to override
-      the hadoop.rpc.protection for a connection at the server side.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.saslproperties.resolver.class</name>
-  <value></value>
-  <description>SaslPropertiesResolver used to resolve the QOP used for a
-      connection. If not specified, the full set of values specified in
-      hadoop.rpc.protection is used while determining the QOP used for the
-      connection. If a class is specified, then the QOP values returned by
-      the class will be used while determining the QOP used for the connection.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.sensitive-config-keys</name>
-  <value>secret$,password$,ssl.keystore.pass$,fs.s3.*[Ss]ecret.?[Kk]ey,fs.azure.account.key.*,dfs.webhdfs.oauth2.[a-z]+.token,hadoop.security.sensitive-config-keys</value>
-  <description>A comma-separated list of regular expressions to match against
-      configuration keys that should be redacted where appropriate, for
-      example, when logging modified properties during a reconfiguration,
-      private credentials should not be logged.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.workaround.non.threadsafe.getpwuid</name>
-  <value>true</value>
-  <description>Some operating systems or authentication modules are known to
-  have broken implementations of getpwuid_r and getpwgid_r, such that these
-  calls are not thread-safe. Symptoms of this problem include JVM crashes
-  with a stack trace inside these functions. If your system exhibits this
-  issue, enable this configuration parameter to include a lock around the
-  calls as a workaround.
-
-  An incomplete list of some systems known to have this issue is available
-  at http://wiki.apache.org/hadoop/KnownBrokenPwuidImplementations
-  </description>
-</property>
-
-<property>
-  <name>hadoop.kerberos.kinit.command</name>
-  <value>kinit</value>
-  <description>Used to periodically renew Kerberos credentials when provided
-  to Hadoop. The default setting assumes that kinit is in the PATH of users
-  running the Hadoop client. Change this to the absolute path to kinit if this
-  is not the case.
-  </description>
-</property>
-
-<property>
-    <name>hadoop.kerberos.min.seconds.before.relogin</name>
-    <value>60</value>
-    <description>The minimum time between relogin attempts for Kerberos, in
-    seconds.
-    </description>
-</property>
-
-<property>
-  <name>hadoop.security.auth_to_local</name>
-  <value></value>
-  <description>Maps kerberos principals to local user names</description>
-</property>
-
-<!-- i/o properties -->
-<property>
-  <name>io.file.buffer.size</name>
-  <value>4096</value>
-  <description>The size of buffer for use in sequence files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>io.bytes.per.checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  io.file.buffer.size.</description>
-</property>
-
-<property>
-  <name>io.skip.checksum.errors</name>
-  <value>false</value>
-  <description>If true, when a checksum error is encountered while
-  reading a sequence file, entries are skipped, instead of throwing an
-  exception.</description>
-</property>
-
-<property>
-  <name>io.compression.codecs</name>
-  <value></value>
-  <description>A comma-separated list of the compression codec classes that can
-  be used for compression/decompression. In addition to any classes specified
-  with this property (which take precedence), codec classes on the classpath
-  are discovered using a Java ServiceLoader.</description>
-</property>
-
-<property>
-  <name>io.compression.codec.bzip2.library</name>
-  <value>system-native</value>
-  <description>The native-code library to be used for compression and
-  decompression by the bzip2 codec.  This library could be specified
-  either by by name or the full pathname.  In the former case, the
-  library is located by the dynamic linker, usually searching the
-  directories specified in the environment variable LD_LIBRARY_PATH.
-
-  The value of "system-native" indicates that the default system
-  library should be used.  To indicate that the algorithm should
-  operate entirely in Java, specify "java-builtin".</description>
-</property>
-
-<property>
-  <name>io.serializations</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.io.serializer.WritableSerialization, org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization, org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
-  <description>A list of serialization classes that can be used for
-  obtaining serializers and deserializers.</description>
-</property>
-
-<property>
-  <name>io.seqfile.local.dir</name>
-  <value>${hadoop.tmp.dir}/io/local</value>
-  <description>The local directory where sequence file stores intermediate
-  data files during merge.  May be a comma-separated list of
-  directories on different devices in order to spread disk i/o.
-  Directories that do not exist are ignored.
-  </description>
-</property>
-
-<property>
-  <name>io.map.index.skip</name>
-  <value>0</value>
-  <description>Number of index entries to skip between each entry.
-  Zero by default. Setting this to values larger than zero can
-  facilitate opening large MapFiles using less memory.</description>
-</property>
-
-<property>
-  <name>io.map.index.interval</name>
-  <value>128</value>
-  <description>
-    MapFile consist of two files - data file (tuples) and index file
-    (keys). For every io.map.index.interval records written in the
-    data file, an entry (record-key, data-file-position) is written
-    in the index file. This is to allow for doing binary search later
-    within the index file to look up records by their keys and get their
-    closest positions in the data file.
-  </description>
-</property>
-
-<!-- file system properties -->
-
-<property>
-  <name>fs.defaultFS</name>
-  <value>file:///</value>
-  <description>The name of the default file system.  A URI whose
-  scheme and authority determine the FileSystem implementation.  The
-  uri's scheme determines the config property (fs.SCHEME.impl) naming
-  the FileSystem implementation class.  The uri's authority is used to
-  determine the host, port, etc. for a filesystem.</description>
-</property>
-
-<property>
-  <name>fs.default.name</name>
-  <value>file:///</value>
-  <description>Deprecated. Use (fs.defaultFS) property
-  instead</description>
-</property>
-
-<property>
-  <name>fs.trash.interval</name>
-  <value>0</value>
-  <description>Number of minutes after which the checkpoint
-  gets deleted.  If zero, the trash feature is disabled.
-  This option may be configured both on the server and the
-  client. If trash is disabled server side then the client
-  side configuration is checked. If trash is enabled on the
-  server side then the value configured on the server is
-  used and the client configuration value is ignored.
-  </description>
-</property>
-
-<property>
-  <name>fs.trash.checkpoint.interval</name>
-  <value>0</value>
-  <description>Number of minutes between trash checkpoints.
-  Should be smaller or equal to fs.trash.interval. If zero,
-  the value is set to the value of fs.trash.interval.
-  Every time the checkpointer runs it creates a new checkpoint
-  out of current and removes checkpoints created more than
-  fs.trash.interval minutes ago.
-  </description>
-</property>
-
-<property>
-  <name>fs.protected.directories</name>
-  <value></value>
-  <description>A comma-separated list of directories which cannot
-    be deleted even by the superuser unless they are empty. This
-    setting can be used to guard important system directories
-    against accidental deletion due to administrator error.
-  </description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.file.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.local.LocalFs</value>
-  <description>The AbstractFileSystem for file: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.har.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.HarFs</value>
-  <description>The AbstractFileSystem for har: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.hdfs.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.Hdfs</value>
-  <description>The FileSystem for hdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.viewfs.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.viewfs.ViewFs</value>
-  <description>The AbstractFileSystem for view file system for viewfs: uris
-  (ie client side mount table:).</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.ftp.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.ftp.FtpFs</value>
-  <description>The FileSystem for Ftp: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.webhdfs.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.WebHdfs</value>
-  <description>The FileSystem for webhdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.swebhdfs.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.SWebHdfs</value>
-  <description>The FileSystem for swebhdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.ftp.host</name>
-  <value>0.0.0.0</value>
-  <description>FTP filesystem connects to this server</description>
-</property>
-
-<property>
-  <name>fs.ftp.host.port</name>
-  <value>21</value>
-  <description>
-    FTP filesystem connects to fs.ftp.host on this port
-  </description>
-</property>
-
-<property>
-  <name>fs.df.interval</name>
-  <value>60000</value>
-  <description>Disk usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.du.interval</name>
-  <value>600000</value>
-  <description>File space usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.s3.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when writing files to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3</value>
-  <description>Determines where on the local filesystem the s3:/s3n: filesystem
-  should store files before sending them to S3
-  (or after retrieving them from S3).
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.maxRetries</name>
-  <value>4</value>
-  <description>The maximum number of retries for reading or writing files to S3,
-  before we signal failure to the application.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.sleepTimeSeconds</name>
-  <value>10</value>
-  <description>The number of seconds to sleep between each S3 retry.
-  </description>
-</property>
-
-<property>
-  <name>fs.automatic.close</name>
-  <value>true</value>
-  <description>By default, FileSystem instances are automatically closed at program
-  exit using a JVM shutdown hook. Setting this property to false disables this
-  behavior. This is an advanced option that should only be used by server applications
-  requiring a more carefully orchestrated shutdown sequence.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when reading files using the native S3
-  filesystem (s3n: URIs).</description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.enabled</name>
-  <value>false</value>
-  <description>Setting this property to true enables multiple uploads to
-  native S3 filesystem. When uploading a file, it is split into blocks
-  if the size is larger than fs.s3n.multipart.uploads.block.size.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.block.size</name>
-  <value>67108864</value>
-  <description>The block size for multipart uploads to native S3 filesystem.
-  Default size is 64MB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.copy.block.size</name>
-  <value>5368709120</value>
-  <description>The block size for multipart copy in native S3 filesystem.
-  Default size is 5GB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.server-side-encryption-algorithm</name>
-  <value></value>
-  <description>Specify a server-side encryption algorithm for S3.
-  Unset by default, and the only other currently allowable value is AES256.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.access.key</name>
-  <description>AWS access key ID used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.secret.key</name>
-  <description>AWS secret key used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.aws.credentials.provider</name>
-  <description>
-	  Comma-separated class names of credential provider classes which implement
-	  com.amazonaws.auth.AWSCredentialsProvider.
-
-	  These are loaded and queried in sequence for a valid set of credentials.
-	  Each listed class must implement one of the following means of
-	  construction, which are attempted in order:
-	  1. a public constructor accepting java.net.URI and
-	  org.apache.hadoop.conf.Configuration,
-	  2. a public static method named getInstance that accepts no
-	  arguments and returns an instance of
-	  com.amazonaws.auth.AWSCredentialsProvider, or
-	  3. a public default constructor.
-
-	  Specifying
-	  org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider
-	  allows
-	  anonymous access to a publicly accessible S3 bucket without any credentials.
-	  Please note that allowing anonymous access to an S3 bucket compromises
-	  security and therefore is unsuitable for most use cases. It can be useful
-	  for accessing public data sets without requiring AWS credentials.
-
-	  If unspecified, then the default list of credential provider classes,
-	  queried in sequence, is:
-	  1. org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider:
-	  supports static
-	  configuration of AWS access key ID and secret access key. See also
-	  fs.s3a.access.key and fs.s3a.secret.key.
-	  2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports
-	  configuration of AWS access key ID and secret access key in
-	  environment variables named AWS_ACCESS_KEY_ID and
-	  AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK.
-	  3.
-	  org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider:
-	  a shared instance of
-	  com.amazonaws.auth.InstanceProfileCredentialsProvider from the AWS
-	  SDK, which supports use of instance profile credentials if running
-	  in an EC2 VM. Using this shared instance potentially reduces load
-	  on the EC2 instance metadata service for multi-threaded
-	  applications.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.session.token</name>
-  <description>Session token, when using org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
-    as one of the providers.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.security.credential.provider.path</name>
-  <value />
-  <description>
-    Optional comma separated list of credential providers, a list
-    which is prepended to that set in hadoop.security.credential.provider.path
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.maximum</name>
-  <value>15</value>
-  <description>Controls the maximum number of simultaneous connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.ssl.enabled</name>
-  <value>true</value>
-  <description>Enables or disables SSL connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.endpoint</name>
-  <description>AWS S3 endpoint to connect to. An up-to-date list is
-    provided in the AWS Documentation: regions and endpoints. Without this
-    property, the standard region (s3.amazonaws.com) is assumed.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.path.style.access</name>
-  <value>false</value>
-  <description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
-    Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.host</name>
-  <description>Hostname of the (optional) proxy server for S3 connections.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.port</name>
-  <description>Proxy server port. If this property is not set
-    but fs.s3a.proxy.host is, port 80 or 443 is assumed (consistent with
-    the value of fs.s3a.connection.ssl.enabled).</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.username</name>
-  <description>Username for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.password</name>
-  <description>Password for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.domain</name>
-  <description>Domain for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.workstation</name>
-  <description>Workstation for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.attempts.maximum</name>
-  <value>20</value>
-  <description>How many times we should retry commands on transient errors.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.establish.timeout</name>
-  <value>5000</value>
-  <description>Socket connection setup timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.timeout</name>
-  <value>200000</value>
-  <description>Socket connection timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.socket.send.buffer</name>
-  <value>8192</value>
-  <description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
-</property>
-
-<property>
-  <name>fs.s3a.socket.recv.buffer</name>
-  <value>8192</value>
-  <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
-</property>
-
-<property>
-  <name>fs.s3a.paging.maximum</name>
-  <value>5000</value>
-  <description>How many keys to request from S3 when doing
-     directory listings at a time.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.max</name>
-  <value>10</value>
-  <description>The total number of threads available in the filesystem for data
-    uploads *or any other queued filesystem operation*.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.keepalivetime</name>
-  <value>60</value>
-  <description>Number of seconds a thread can be idle before being
-    terminated.</description>
-</property>
-
-<property>
-  <name>fs.s3a.max.total.tasks</name>
-  <value>5</value>
-  <description>The number of operations which can be queued for execution</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.size</name>
-  <value>100M</value>
-  <description>How big (in bytes) to split upload or copy operations up into.
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.threshold</name>
-  <value>2147483647</value>
-  <description>How big (in bytes) to split upload or copy operations up into.
-    This also controls the partition size in renamed files, as rename() involves
-    copying the source file(s).
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multiobjectdelete.enable</name>
-  <value>true</value>
-  <description>When enabled, multiple single-object delete requests are replaced by
-    a single 'delete multiple objects'-request, reducing the number of requests.
-    Beware: legacy S3-compatible object stores might not support this request.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.acl.default</name>
-  <description>Set a canned ACL for newly created and copied objects. Value may be Private,
-      PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
-      or BucketOwnerFullControl.</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge</name>
-  <value>false</value>
-  <description>True if you want to purge existing multipart uploads that may not have been
-    completed/aborted correctly. The corresponding purge age is defined in
-    fs.s3a.multipart.purge.age.
-    If set, when the filesystem is instantiated then all outstanding uploads
-    older than the purge age will be terminated -across the entire bucket.
-    This will impact multipart uploads by other applications and users. so should
-    be used sparingly, with an age value chosen to stop failed uploads, without
-    breaking ongoing operations.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge.age</name>
-  <value>86400</value>
-  <description>Minimum age in seconds of multipart uploads to purge.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
-  <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default, and the only other currently allowable value is AES256.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.signing-algorithm</name>
-  <description>Override the default signing algorithm so legacy
-    implementations can still be used</description>
-</property>
-
-<property>
-  <name>fs.s3a.block.size</name>
-  <value>32M</value>
-  <description>Block size to use when reading files using s3a: file system.
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3a</value>
-  <description>Comma separated list of directories that will be used to buffer file
-    uploads to.</description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload</name>
-  <value>false</value>
-  <description>
-    Use the incremental block-based fast upload mechanism with
-    the buffering mechanism set in fs.s3a.fast.upload.buffer.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload.buffer</name>
-  <value>disk</value>
-  <description>
-    The buffering mechanism to use when using S3A fast upload
-    (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
-    This configuration option has no effect if fs.s3a.fast.upload is false.
-
-    "disk" will use the directories listed in fs.s3a.buffer.dir as
-    the location(s) to save data prior to being uploaded.
-
-    "array" uses arrays in the JVM heap
-
-    "bytebuffer" uses off-heap memory within the JVM.
-
-    Both "array" and "bytebuffer" will consume memory in a single stream up to the number
-    of blocks set by:
-
-        fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
-
-    If using either of these mechanisms, keep this value low
-
-    The total number of threads performing work across all threads is set by
-    fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
-    work items.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload.active.blocks</name>
-  <value>4</value>
-  <description>
-    Maximum Number of blocks a single output stream can have
-    active (uploading, or queued to the central FileSystem
-    instance's pool of queued operations.
-
-    This stops a single stream overloading the shared thread pool.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.readahead.range</name>
-  <value>64K</value>
-  <description>Bytes to read ahead during a seek() before closing and
-  re-opening the S3 HTTP connection. This option will be overridden if
-  any call to setReadahead() is made to an open stream.
-  A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.user.agent.prefix</name>
-  <value></value>
-  <description>
-    Sets a custom value that will be prepended to the User-Agent header sent in
-    HTTP requests to the S3 back-end by S3AFileSystem.  The User-Agent header
-    always includes the Hadoop version number followed by a string generated by
-    the AWS SDK.  An example is "User-Agent: Hadoop 2.8.0, aws-sdk-java/1.10.6".
-    If this optional property is set, then its value is prepended to create a
-    customized User-Agent.  For example, if this configuration property was set
-    to "MyApp", then an example of the resulting User-Agent would be
-    "User-Agent: MyApp, Hadoop 2.8.0, aws-sdk-java/1.10.6".
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-  <description>The implementation class of the S3A Filesystem</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.s3a.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.fs.s3a.S3A</value>
-  <description>The implementation class of the S3A AbstractFileSystem.</description>
-</property>
-
-<property>
-  <name>io.seqfile.compress.blocksize</name>
-  <value>1000000</value>
-  <description>The minimum block size for compression in block compressed
-          SequenceFiles.
-  </description>
-</property>
-
- <property>
-  <name>io.mapfile.bloom.size</name>
-  <value>1048576</value>
-  <description>The size of BloomFilter-s used in BloomMapFile. Each time this many
-  keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
-  Larger values minimize the number of filters, which slightly increases the performance,
-  but may waste too much space if the total number of keys is usually much smaller
-  than this number.
-  </description>
-</property>
-
-<property>
-  <name>io.mapfile.bloom.error.rate</name>
-  <value>0.005</value>
-  <description>The rate of false positives in BloomFilter-s used in BloomMapFile.
-  As this value decreases, the size of BloomFilter-s increases exponentially. This
-  value is the probability of encountering false positives (default is 0.5%).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.util.hash.type</name>
-  <value>murmur</value>
-  <description>The default implementation of Hash. Currently this can take one of the
-  two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
-  </description>
-</property>
-
-
-<!-- ipc properties -->
-
-<property>
-  <name>ipc.client.idlethreshold</name>
-  <value>4000</value>
-  <description>Defines the threshold number of connections after which
-               connections will be inspected for idleness.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.kill.max</name>
-  <value>10</value>
-  <description>Defines the maximum number of clients to disconnect in one go.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connection.maxidletime</name>
-  <value>10000</value>
-  <description>The maximum time in msec after which a client will bring down the
-               connection to the server.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries</name>
-  <value>10</value>
-  <description>Indicates the number of retries a client will make to establish
-               a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.retry.interval</name>
-  <value>1000</value>
-  <description>Indicates the number of milliseconds a client will wait for
-    before retrying to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.timeout</name>
-  <value>20000</value>
-  <description>Indicates the number of milliseconds a client will wait for the
-               socket to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries.on.timeouts</name>
-  <value>45</value>
-  <description>Indicates the number of retries a client will make on socket timeout
-               to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.tcpnodelay</name>
-  <value>true</value>
-  <description>Use TCP_NODELAY flag to bypass Nagle's algorithm transmission delays.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.low-latency</name>
-  <value>false</value>
-  <description>Use low-latency QoS markers for IPC connections.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.ping</name>
-  <value>true</value>
-  <description>Send a ping to the server when timeout on reading the response,
-  if set to true. If no failure is detected, the client retries until at least
-  a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
-  </description>
-</property>
-
-<property>
-  <name>ipc.ping.interval</name>
-  <value>60000</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  The client will send ping when the interval is passed without receiving bytes,
-  if ipc.client.ping is set to true.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.rpc-timeout.ms</name>
-  <value>0</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  If ipc.client.ping is set to true and this rpc-timeout is greater than
-  the value of ipc.ping.interval, the effective value of the rpc-timeout is
-  rounded up to multiple of ipc.ping.interval.
-  </description>
-</property>
-
-<property>
-  <name>ipc.server.listen.queue.size</name>
-  <value>128</value>
-  <description>Indicates the length of the listen queue for servers accepting
-               client connections.
-  </description>
-</property>
-
-<property>
-    <name>ipc.server.log.slow.rpc</name>
-    <value>false</value>
-    <description>This setting is useful to troubleshoot performance issues for
-     various services. If this value is set to true then we log requests that
-     fall into 99th percentile as well as increment RpcSlowCalls counter.
-    </description>
-</property>
-
-<property>
-  <name>ipc.maximum.data.length</name>
-  <value>67108864</value>
-  <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the server. Messages larger than this value are rejected by the
-    immediately to avoid possible OOMs. This setting should rarely need to be
-    changed.
-  </description>
-</property>
-
-<property>
-  <name>ipc.maximum.response.length</name>
-  <value>134217728</value>
-  <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the client. Messages larger than this value are rejected
-    immediately to avoid possible OOMs. This setting should rarely need to be
-    changed.  Set to 0 to disable.
-  </description>
-</property>
-
-<!-- Proxy Configuration -->
-
-<property>
-  <name>hadoop.security.impersonation.provider.class</name>
-  <value></value>
-  <description>A class which implements ImpersonationProvider interface, used to
-       authorize whether one user can impersonate a specific user.
-       If not specified, the DefaultImpersonationProvider will be used.
-       If a class is specified, then that class will be used to determine
-       the impersonation capability.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.default</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.net.StandardSocketFactory</value>
-  <description> Default SocketFactory to use. This parameter is expected to be
-    formatted as "package.FactoryClassName".
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
-  <value></value>
-  <description> SocketFactory to use to connect to a DFS. If null or empty, use
-    hadoop.rpc.socket.class.default. This socket factory is also used by
-    DFSClient to create sockets to DataNodes.
-  </description>
-</property>
-
-
-
-<property>
-  <name>hadoop.socks.server</name>
-  <value></value>
-  <description> Address (host:port) of the SOCKS server to be used by the
-    SocksSocketFactory.
-  </description>
-</property>
-
-<!-- Topology Configuration -->
-<property>
-  <name>net.topology.node.switch.mapping.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.net.ScriptBasedMapping</value>
-  <description> The default implementation of the DNSToSwitchMapping. It
-    invokes a script specified in net.topology.script.file.name to resolve
-    node names. If the value for net.topology.script.file.name is not set, the
-    default value of DEFAULT_RACK is returned for all node names.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.impl</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.net.NetworkTopology</value>
-  <description> The default implementation of NetworkTopology which is classic three layer one.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.file.name</name>
-  <value></value>
-  <description> The script name that should be invoked to resolve DNS names to
-    NetworkTopology names. Example: the script would take host.foo.bar as an
-    argument, and return /rack1 as the output.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.number.args</name>
-  <value>100</value>
-  <description> The max number of args that the script configured with
-    net.topology.script.file.name should be run with. Each arg is an
-    IP address.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.table.file.name</name>
-  <value></value>
-  <description> The file name for a topology file, which is used when the
-    net.topology.node.switch.mapping.impl property is set to
-    org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.net.TableMapping. The file format is a two column text
-    file, with columns separated by whitespace. The first column is a DNS or
-    IP address and the second column specifies the rack where the address maps.
-    If no entry corresponding to a host in the cluster is found, then
-    /default-rack is assumed.
-  </description>
-</property>
-
-<!-- Local file system -->
-<property>
-  <name>file.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>file.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  file.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>file.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>file.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>file.replication</name>
-  <value>1</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3 File System -->
-
-<property>
-  <name>s3.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3native File System -->
-
-<property>
-  <name>s3native.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3native.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3native.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3native.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3native.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3native.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- FTP file system -->
-<property>
-  <name>ftp.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>ftp.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  ftp.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>ftp.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>ftp.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>ftp.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- Tfile -->
-
-<property>
-  <name>tfile.io.chunk.size</name>
-  <value>1048576</value>
-  <description>
-    Value chunk size in bytes. Default  to
-    1MB. Values of the length less than the chunk size is
-    guaranteed to have known value length in read time (See also
-    TFile.Reader.Scanner.Entry.isValueLengthKnown()).
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.output.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataOutputStream in bytes.
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.input.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataInputStream in bytes.
-  </description>
-</property>
-
-<!-- HTTP web-consoles Authentication -->
-
-<property>
-  <name>hadoop.http.authentication.type</name>
-  <value>simple</value>
-  <description>
-    Defines authentication used for Oozie HTTP endpoint.
-    Supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.token.validity</name>
-  <value>36000</value>
-  <description>
-    Indicates how long (in seconds) an authentication token is valid before it has
-    to be renewed.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.signature.secret.file</name>
-  <value>${user.home}/hadoop-http-auth-signature-secret</value>
-  <description>
-    The signature secret for signing the authentication tokens.
-    The same secret should be used for JT/NN/DN/TT configurations.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.cookie.domain</name>
-  <value></value>
-  <description>
-    The domain to use for the HTTP cookie that stores the authentication token.
-    In order to authentiation to work correctly across all Hadoop nodes web-consoles
-    the domain must be correctly set.
-    IMPORTANT: when using IP addresses, browsers ignore cookies with domain settings.
-    For this setting to work properly all nodes in the cluster must be configured
-    to generate URLs with hostname.domain names on it.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.simple.anonymous.allowed</name>
-  <value>true</value>
-  <description>
-    Indicates if anonymous requests are allowed when using 'simple' authentication.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.principal</name>
-  <value>HTTP/_HOST@LOCALHOST</value>
-  <description>
-    Indicates the Kerberos principal to be used for HTTP endpoint.
-    The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.keytab</name>
-  <value>${user.home}/hadoop.keytab</value>
-  <description>
-    Location of the keytab file with the credentials for the principal.
-    Referring to the same keytab file Oozie uses for its Kerberos credentials for Hadoop.
-  </description>
-</property>
-
-<!-- HTTP CORS support -->
-<property>
-  <description>Enable/disable the cross-origin (CORS) filter.</description>
-  <name>hadoop.http.cross-origin.enabled</name>
-  <value>false</value>
-</property>
-
-<property>
-  <description>Comma separated list of origins that are allowed for web
-    services needing cross-origin (CORS) support. Wildcards (*) and patterns
-    allowed</description>
-  <name>hadoop.http.cross-origin.allowed-origins</name>
-  <value>*</value>
-</property>
-
-<property>
-  <description>Comma separated list of methods that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-methods</name>
-  <value>GET,POST,HEAD</value>
-</property>
-
-<property>
-  <description>Comma separated list of headers that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-headers</name>
-  <value>X-Requested-With,Content-Type,Accept,Origin</value>
-</property>
-
-<property>
-  <description>The number of seconds a pre-flighted request can be cached
-    for web services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.max-age</name>
-  <value>1800</value>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.methods</name>
-  <value></value>
-  <description>
-    List of fencing methods to use for service fencing. May contain
-    builtin methods (eg shell and sshfence) or user-defined method.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.connect-timeout</name>
-  <value>30000</value>
-  <description>
-    SSH connection timeout, in milliseconds, to use with the builtin
-    sshfence fencer.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.private-key-files</name>
-  <value></value>
-  <description>
-    The SSH private key files to use with the builtin sshfence fencer.
-  </description>
-</property>
-
-
-<!-- Static Web User Filter properties. -->
-<property>
-  <description>
-    The user name to filter as, on static web filters
-    while rendering content. An example use is the HDFS
-    web UI (user to be used for browsing files).
-  </description>
-  <name>hadoop.http.staticuser.user</name>
-  <value>dr.who</value>
-</property>
-
-<property>
-  <name>ha.zookeeper.quorum</name>
-  <description>
-    A list of ZooKeeper server addresses, separated by commas, that are
-    to be used by the ZKFailoverController in automatic failover.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.session-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    The session timeout to use when the ZKFC connects to ZooKeeper.
-    Setting this value to a lower value implies that server crashes
-    will be detected more quickly, but risks triggering failover too
-    aggressively in the case of a transient error or network blip.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.parent-znode</name>
-  <value>/hadoop-ha</value>
-  <description>
-    The ZooKeeper znode under which the ZK failover controller stores
-    its information. Note that the nameservice ID is automatically
-    appended to this znode, so it is not normally necessary to
-    configure this, even in a federated environment.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.acl</name>
-  <value>world:anyone:rwcda</value>
-  <description>
-    A comma-separated list of ZooKeeper ACLs to apply to the znodes
-    used by automatic failover. These ACLs are specified in the same
-    format as used by the ZooKeeper CLI.
-
-    If the ACL itself contains secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.auth</name>
-  <value></value>
-  <description>
-    A comma-separated list of ZooKeeper authentications to add when
-    connecting to ZooKeeper. These are specified in the same format
-    as used by the &quot;addauth&quot; command in the ZK CLI. It is
-    important that the authentications specified here are sufficient
-    to access znodes with the ACL specified in ha.zookeeper.acl.
-
-    If the auths contain secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<!-- SSLFactory configuration -->
-
-<property>
-  <name>hadoop.ssl.keystores.factory.class</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
-  <description>
-    The keystores factory to use for retrieving certificates.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.require.client.cert</name>
-  <value>false</value>
-  <description>Whether client certificates are required</description>
-</property>
-
-<property>
-  <name>hadoop.ssl.hostname.verifier</name>
-  <value>DEFAULT</value>
-  <description>
-    The hostname verifier to provide for HttpsURLConnections.
-    Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
-    ALLOW_ALL
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.server.conf</name>
-  <value>ssl-server.xml</value>
-  <description>
-    Resource file from which ssl server keystore information will be extracted.
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.client.conf</name>
-  <value>ssl-client.xml</value>
-  <description>
-    Resource file from which ssl client keystore information will be extracted
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled</name>
-  <value>false</value>
-  <description>
-    Deprecated. Use dfs.http.policy and yarn.http.policy instead.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled.protocols</name>
-  <value>TLSv1</value>
-  <description>
-    Protocols supported by the ssl.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.jetty.logs.serve.aliases</name>
-  <value>true</value>
-  <description>
-    Enable/Disable aliases serving from jetty
-  </description>
-</property>
-
-<property>
-  <name>fs.permissions.umask-mode</name>
-  <value>022</value>
-  <description>
-    The umask used when creating files and directories.
-    Can be in octal or in symbolic. Examples are:
-    "022" (octal for u=rwx,g=r-x,o=r-x in symbolic),
-    or "u=rwx,g=rwx,o=" (symbolic for 007 in octal).
-  </description>
-</property>
-
-<!-- ha properties -->
-
-<property>
-  <name>ha.health-monitor.connect-retry-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to retry connecting to the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.check-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to check the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.sleep-after-disconnect.ms</name>
-  <value>1000</value>
-  <description>
-    How long to sleep after an unexpected RPC error.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.rpc-timeout.ms</name>
-  <value>45000</value>
-  <description>
-    Timeout for the actual monitorHealth() calls.
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.new-active.rpc-timeout.ms</name>
-  <value>60000</value>
-  <description>
-    Timeout that the FC waits for the new active to become active
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.rpc-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    Timeout that the FC waits for the old active to go to standby
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.connection.retries</name>
-  <value>1</value>
-  <description>
-    FC connection retries for graceful fencing
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
-  <value>20000</value>
-  <description>
-    Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.fallback-to-simple-auth-allowed</name>
-  <value>false</value>
-  <description>
-    When a client is configured to attempt a secure connection, but attempts to
-    connect to an insecure server, that server may instruct the client to
-    switch to SASL SIMPLE (unsecure) authentication. This setting controls
-    whether or not the client will accept this instruction from the server.
-    When false (the default), the client will not allow the fallback to SIMPLE
-    authentication, and will abort the connection.
-  </description>
-</property>
-
-<property>
-  <name>fs.client.resolve.remote.symlinks</name>
-  <value>true</value>
-  <description>
-      Whether to resolve symlinks when accessing a remote Hadoop filesystem.
-      Setting this to false causes an exception to be thrown upon encountering
-      a symlink. This setting does not apply to local filesystems, which
-      automatically resolve local symlinks.
-  </description>
-</property>
-
-<property>
-  <name>nfs.exports.allowed.hosts</name>
-  <value>* rw</value>
-  <description>
-    By default, the export can be mounted by any client. The value string
-    contains machine name and access privilege, separated by whitespace
-    characters. The machine name format can be a single host, a Java regular
-    expression, or an IPv4 address. The access privilege uses rw or ro to
-    specify read/write or read-only access of the machines to exports. If the
-    access privilege is not provided, the default is read-only. Entries are separated by ";".
-    For example: "192.168.0.0/22 rw ; host.*\.example\.com ; host1.test.org ro;".
-    Only the NFS gateway needs to restart after this property is updated.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.user.group.static.mapping.overrides</name>
-  <value>dr.who=;</value>
-  <description>
-    Static mapping of user to groups. This will override the groups if
-    available in the system for the specified user. In otherwords, groups
-    look-up will not happen for these users, instead groups mapped in this
-    configuration will be used.
-    Mapping should be in this format.
-    user1=group1,group2;user2=;user3=group2;
-    Default, "dr.who=;" will consider "dr.who" as user without groups.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.quantile.enable</name>
-  <value>false</value>
-  <description>
-    Setting this property to true and rpc.metrics.percentiles.intervals
-    to a comma-separated list of the granularity in seconds, the
-    50/75/90/95/99th percentile latency for rpc queue/processing time in
-    milliseconds are added to rpc metrics.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.percentiles.intervals</name>
-  <value></value>
-  <description>
-    A comma-separated list of the granularity in seconds for the metrics which
-    describe the 50/75/90/95/99th percentile latency for rpc queue/processing
-    time. The metrics are outputted if rpc.metrics.quantile.enable is set to
-    true.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE</name>
-  <value></value>
-  <description>
-    The prefix for a given crypto codec, contains a comma-separated
-    list of implementation classes for a given crypto codec (eg EXAMPLECIPHERSUITE).
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.aes.ctr.nopadding</name>
-  <value>org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.crypto.JceAesCtrCryptoCodec</value>
-  <description>
-    Comma-separated list of crypto codec implementations for AES/CTR/NoPadding.
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.cipher.suite</name>
-  <value>AES/CTR/NoPadding</value>
-  <description>
-    Cipher suite for crypto codec.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.jce.provider</name>
-  <value></value>
-  <description>
-    The JCE provider name used in CryptoCodec.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.buffer.size</name>
-  <value>8192</value>
-  <description>
-    The buffer size used by CryptoInputStream and CryptoOutputStream.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.java.secure.random.algorithm</name>
-  <value>SHA1PRNG</value>
-  <description>
-    The java secure random algorithm.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.secure.random.impl</name>
-  <value></value>
-  <description>
-    Implementation of secure random.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.random.device.file.path</name>
-  <value>/dev/urandom</value>
-  <description>
-    OS security random device file path.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.key.provider.path</name>
-  <description>
-    The KeyProvider to use when managing zone keys, and interacting with
-    encryption keys when reading and writing to an encryption zone.
-  </description>
-</property>
-
-<property>
-  <name>fs.har.impl.disable.cache</name>
-  <value>true</value>
-  <description>Don't cache 'har' filesystem instances.</description>
-</property>
-
-<!--- KMSClientProvider configurations -->
-<property>
-  <name>hadoop.security.kms.client.authentication.retry-count</name>
-  <value>1</value>
-  <description>
-    Number of time to retry connecting to KMS on authentication failure
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.size</name>
-  <value>500</value>
-  <description>
-    Size of the EncryptedKeyVersion cache Queue for each key
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.low-watermark</name>
-  <value>0.3f</value>
-  <description>
-    If size of the EncryptedKeyVersion cache Queue falls below the
-    low watermark, this cache queue will be scheduled for a refill
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.num.refill.threads</name>
-  <value>2</value>
-  <description>
-    Number of threads to use for refilling depleted EncryptedKeyVersion
-    cache Queues
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.expiry</name>
-  <value>43200000</value>
-  <description>
-    Cache expiry time for a Key, after which the cache Queue for this
-    key will be dropped. Default = 12hrs
-  </description>
-</property>
-
- <property>
-  <name>ipc.server.max.connections</name>
-  <value>0</value>
-  <description>The maximum number of concurrent connections a server is allowed
-    to accept. If this limit is exceeded, incoming connections will first fill
-    the listen queue and then may go to an OS-specific listen overflow queue.
-    The client may fail or timeout, but the server can avoid running out of file
-    descriptors using this feature. 0 means no limit.
-  </description>
-</property>
-
-
-  <!-- YARN registry -->
-
-  <property>
-    <description>
-      Is the registry enabled in the YARN Resource Manager?
-
-      If true, the YARN RM will, as needed.
-      create the user and system paths, and purge
-      service records when containers, application attempts
-      and applications complete.
-
-      If false, the paths must be created by other means,
-      and no automatic cleanup of service records will take place.
-    </description>
-    <name>hadoop.registry.rm.enabled</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      The root zookeeper node for the registry
-    </description>
-    <name>hadoop.registry.zk.root</name>
-    <value>/registry</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper session timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.session.timeout.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.connection.timeout.ms</name>
-    <value>15000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection retry count before failing
-    </description>
-    <name>hadoop.registry.zk.retry.times</name>
-    <value>5</value>
-  </property>
-
-  <property>
-    <description>
-    </description>
-    <name>hadoop.registry.zk.retry.interval.ms</name>
-    <value>1000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper retry limit in milliseconds, during
-      exponential backoff.
-
-      This places a limit even
-      if the retry times and interval limit, combined
-      with the backoff policy, result in a long retry
-      period
-    </description>
-    <name>hadoop.registry.zk.retry.ceiling.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      List of hostname:port pairs defining the
-      zookeeper quorum binding for the registry
-    </description>
-    <name>hadoop.registry.zk.quorum</name>
-    <value>localhost:2181</value>
-  </property>
-
-  <property>
-    <description>
-      Key to set if the registry is secure. Turning it on
-      changes the permissions policy from "open access"
-      to restrictions on kerberos with the option of
-      a user adding one or more auth key pairs down their
-      own tree.
-    </description>
-    <name>hadoop.registry.secure</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      A comma separated list of Zookeeper ACL identifiers with
-      system access to the registry in a secure cluster.
-
-      These are given full access to all entries.
-
-      If there is an "@" at the end of a SASL entry it
-      instructs the registry client to append the default kerberos domain.
-    </description>
-    <name>hadoop.registry.system.acls</name>
-    <value>sasl:yarn@, sasl:mapred@, sasl:hdfs@</value>
-  </property>
-
-  <property>
-    <description>
-      The kerberos realm: used to set the realm of
-      system principals which do not declare their realm,
-      and any other accounts that need the value.
-
-      If empty, the default realm of the running process
-      is used.
-
-      If neither are known and the realm is needed, then the registry
-      service/client will fail.
-    </description>
-    <name>hadoop.registry.kerberos.realm</name>
-    <value></value>
-  </property>
-
-  <property>
-    <description>
-      Key to define the JAAS context. Used in secure
-      mode
-    </description>
-    <name>hadoop.registry.jaas.context</name>
-    <value>Client</value>
-  </property>
-
-  <property>
-    <description>
-      Enable hdfs shell commands to display warnings if (fs.defaultFS) property
-      is not set.
-    </description>
-    <name>hadoop.shell.missing.defaultFs.warning</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <name>hadoop.shell.safely.delete.limit.num.files</name>
-    <value>100</value>
-    <description>Used by -safely option of hadoop fs shell -rm command to avoid
-      accidental deletion of large directories. When enabled, the -rm command
-      requires confirmation if the number of files to be deleted is greater than
-      this limit.  The default limit is 100 files. The warning is disabled if
-      the limit is 0 or the -safely is not specified in -rm command.
-    </description>
-  </property>
-
-  <property>
-    <name>fs.client.htrace.sampler.classes</name>
-    <value></value>
-    <description>The class names of the HTrace Samplers to use for Hadoop
-      filesystem clients.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.htrace.span.receiver.classes</name>
-    <value></value>
-    <description>The class names of the Span Receivers to use for Hadoop.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.caller.context.enabled</name>
-    <value>false</value>
-    <description>When the feature is enabled, additional fields are written into
-      name-node audit log records for auditing coarse granularity operations.
-    </description>
-  </property>
-  <property>
-    <name>hadoop.caller.context.max.size</name>
-    <value>128</value>
-    <description>The maximum bytes a caller context string can have. If the
-      passed caller context is longer than this maximum bytes, client will
-      truncate it before sending to server. Note that the server may have a
-      different maximum size, and will truncate the caller context to the
-      maximum size it allows.
-    </description>
-  </property>
-  <property>
-    <name>hadoop.caller.context.signature.max.size</name>
-    <value>40</value>
-    <description>
-      The caller's signature (optional) is for offline validation. If the
-      signature exceeds the maximum allowed bytes in server, the caller context
-      will be abandoned, in which case the caller context will not be recorded
-      in audit logs.
-    </description>
-  </property>
-
-</configuration>
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
index 8672ca4..6dbdac5 100644
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
+++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemITCase.java
@@ -48,7 +48,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Unit tests for the S3 file system support via Hadoop's {@link org.apache.hadoop.fs.s3a.S3AFileSystem}.
@@ -59,6 +58,7 @@ import static org.junit.Assert.fail;
  */
 @RunWith(Parameterized.class)
 public class HadoopS3FileSystemITCase extends TestLogger {
+
 	@Parameterized.Parameter
 	public String scheme;
 
@@ -98,32 +98,31 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 		// directory must not yet exist
 		assertFalse(fs.exists(directory));
 
-		// reset configuration
-		FileSystem.initialize(new Configuration());
-
 		skipTest = false;
 	}
 
 	@AfterClass
 	public static void cleanUp() throws IOException, InterruptedException {
-		if (!skipTest) {
-			final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
-			// initialize configuration with valid credentials
-			final Configuration conf = new Configuration();
-			conf.setString("s3.access.key", ACCESS_KEY);
-			conf.setString("s3.secret.key", SECRET_KEY);
-			FileSystem.initialize(conf);
-
-			final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
-			final FileSystem fs = directory.getFileSystem();
-
-			// clean up
-			fs.delete(directory, true);
-
-			// now directory must be gone
-			checkPathEventualExistence(fs, directory, false, deadline);
-
-			// reset configuration
+		try {
+			if (!skipTest) {
+				final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
+				// initialize configuration with valid credentials
+				final Configuration conf = new Configuration();
+				conf.setString("s3.access.key", ACCESS_KEY);
+				conf.setString("s3.secret.key", SECRET_KEY);
+				FileSystem.initialize(conf);
+
+				final Path directory = new Path("s3://" + BUCKET + '/' + TEST_DATA_DIR);
+				final FileSystem fs = directory.getFileSystem();
+
+				// clean up
+				fs.delete(directory, true);
+
+				// now directory must be gone
+				checkPathEventualExistence(fs, directory, false, deadline);
+			}
+		}
+		finally {
 			FileSystem.initialize(new Configuration());
 		}
 	}
@@ -136,16 +135,6 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 	public void testConfigKeysForwarding() throws Exception {
 		final Path path = new Path(getBasePath());
 
-		// access without credentials should fail
-		{
-			FileSystem.initialize(new Configuration());
-
-			try {
-				path.getFileSystem();
-				fail("should fail with an exception");
-			} catch (IOException ignored) {}
-		}
-
 		// standard Hadoop-style credential keys
 		{
 			Configuration conf = new Configuration();
@@ -175,14 +164,6 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 			FileSystem.initialize(conf);
 			path.getFileSystem();
 		}
-
-		// see that setting a blank config again clears everything
-		// and access without credentials should fail
-		FileSystem.initialize(new Configuration());
-		try {
-			path.getFileSystem();
-			fail("should fail with an exception");
-		} catch (IOException ignored) {}
 	}
 
 	@Test
@@ -279,5 +260,4 @@ public class HadoopS3FileSystemITCase extends TestLogger {
 		// now directory must be gone (this is eventually-consistent, though!)
 		checkPathEventualExistence(fs, directory, false, deadline);
 	}
-
 }
diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/resources/core-site.xml b/flink-filesystems/flink-s3-fs-hadoop/src/test/resources/core-site.xml
deleted file mode 100644
index b875e97..0000000
--- a/flink-filesystems/flink-s3-fs-hadoop/src/test/resources/core-site.xml
+++ /dev/null
@@ -1,2312 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<!-- Do not modify this file directly.  Instead, copy entries that you -->
-<!-- wish to modify from this file into core-site.xml and change them -->
-<!-- there.  If core-site.xml does not already exist, create it.      -->
-
-<configuration>
-
-<!--- global properties -->
-
-<property>
-  <name>hadoop.common.configuration.version</name>
-  <value>0.23.0</value>
-  <description>version of this configuration file</description>
-</property>
-
-<property>
-  <name>hadoop.tmp.dir</name>
-  <value>/tmp/hadoop-${user.name}</value>
-  <description>A base for other temporary directories.</description>
-</property>
-
-<property>
-  <name>io.native.lib.available</name>
-  <value>true</value>
-  <description>Controls whether to use native libraries for bz2 and zlib
-    compression codecs or not. The property does not control any other native
-    libraries.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.filter.initializers</name>
-  <value>org.apache.hadoop.http.lib.StaticUserWebFilter</value>
-  <description>A comma separated list of class names. Each class in the list
-  must extend org.apache.hadoop.http.FilterInitializer. The corresponding
-  Filter will be initialized. Then, the Filter will be applied to all user
-  facing jsp and servlet web pages.  The ordering of the list defines the
-  ordering of the filters.</description>
-</property>
-
-<!--- security properties -->
-
-<property>
-  <name>hadoop.security.authorization</name>
-  <value>false</value>
-  <description>Is service-level authorization enabled?</description>
-</property>
-
-<property>
-  <name>hadoop.security.instrumentation.requires.admin</name>
-  <value>false</value>
-  <description>
-    Indicates if administrator ACLs are required to access
-    instrumentation servlets (JMX, METRICS, CONF, STACKS).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.authentication</name>
-  <value>simple</value>
-  <description>Possible values are simple (no authentication), and kerberos
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping</name>
-  <value>org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback</value>
-  <description>
-    Class for user to group mapping (get groups for a given user) for ACL.
-    The default implementation,
-    org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback,
-    will determine if the Java Native Interface (JNI) is available. If JNI is
-    available the implementation will use the API within hadoop to resolve a
-    list of groups for a user. If JNI is not available then the shell
-    implementation, ShellBasedUnixGroupsMapping, is used.  This implementation
-    shells out to the Linux/Unix environment with the
-    <code>bash -c groups</code> command to resolve a list of groups for a user.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.interface</name>
-  <description>
-    The name of the Network Interface from which the service should determine
-    its host name for Kerberos login. e.g. eth2. In a multi-homed environment,
-    the setting can be used to affect the _HOST subsitution in the service
-    Kerberos principal. If this configuration value is not set, the service
-    will use its default hostname as returned by
-    InetAddress.getLocalHost().getCanonicalHostName().
-
-    Most clusters will not require this setting.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.nameserver</name>
-  <description>
-    The host name or IP address of the name server (DNS) which a service Node
-    should use to determine its own host name for Kerberos Login. Requires
-    hadoop.security.dns.interface.
-
-    Most clusters will not require this setting.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.log-slow-lookups.enabled</name>
-  <value>false</value>
-  <description>
-    Time name lookups (via SecurityUtil) and log them if they exceed the
-    configured threshold.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.dns.log-slow-lookups.threshold.ms</name>
-  <value>1000</value>
-  <description>
-    If slow lookup logging is enabled, this threshold is used to decide if a
-    lookup is considered slow enough to be logged.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.secs</name>
-  <value>300</value>
-  <description>
-    This is the config controlling the validity of the entries in the cache
-    containing the user->group mapping. When this duration has expired,
-    then the implementation of the group mapping provider is invoked to get
-    the groups of the user and then cached back.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.negative-cache.secs</name>
-  <value>30</value>
-  <description>
-    Expiration time for entries in the the negative user-to-group mapping
-    caching, in seconds. This is useful when invalid users are retrying
-    frequently. It is suggested to set a small value for this expiration, since
-    a transient error in group lookup could temporarily lock out a legitimate
-    user.
-
-    Set this to zero or negative value to disable negative user-to-group caching.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.warn.after.ms</name>
-  <value>5000</value>
-  <description>
-    If looking up a single user to group takes longer than this amount of
-    milliseconds, we will log a warning message.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.background.reload</name>
-  <value>false</value>
-  <description>
-    Whether to reload expired user->group mappings using a background thread
-    pool. If set to true, a pool of
-    hadoop.security.groups.cache.background.reload.threads is created to
-    update the cache in the background.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.background.reload.threads</name>
-  <value>3</value>
-  <description>
-    Only relevant if hadoop.security.groups.cache.background.reload is true.
-    Controls the number of concurrent background user->group cache entry
-    refreshes. Pending refresh requests beyond this value are queued and
-    processed when a thread is free.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.connection.timeout.ms</name>
-  <value>60000</value>
-  <description>
-    This property is the connection timeout (in milliseconds) for LDAP
-    operations. If the LDAP provider doesn't establish a connection within the
-    specified period, it will abort the connect attempt. Non-positive value
-    means no LDAP connection timeout is specified in which case it waits for the
-    connection to establish until the underlying network times out.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.read.timeout.ms</name>
-  <value>60000</value>
-  <description>
-    This property is the read timeout (in milliseconds) for LDAP
-    operations. If the LDAP provider doesn't get a LDAP response within the
-    specified period, it will abort the read attempt. Non-positive value
-    means no read timeout is specified in which case it waits for the response
-    infinitely.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.url</name>
-  <value></value>
-  <description>
-    The URL of the LDAP server to use for resolving user groups when using
-    the LdapGroupsMapping user to group mapping.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl</name>
-  <value>false</value>
-  <description>
-    Whether or not to use SSL when connecting to the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore</name>
-  <value></value>
-  <description>
-    File path to the SSL keystore that contains the SSL certificate required
-    by the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the LDAP SSL keystore.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.user</name>
-  <value></value>
-  <description>
-    The distinguished name of the user to bind as when connecting to the LDAP
-    server. This may be left blank if the LDAP server supports anonymous binds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the bind user.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.base</name>
-  <value></value>
-  <description>
-    The search base for the LDAP connection. This is a distinguished name,
-    and will typically be the root of the LDAP directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.user</name>
-  <value>(&amp;(objectClass=user)(sAMAccountName={0}))</value>
-  <description>
-    An additional filter to use when searching for LDAP users. The default will
-    usually be appropriate for Active Directory installations. If connecting to
-    an LDAP server with a non-AD schema, this should be replaced with
-    (&amp;(objectClass=inetOrgPerson)(uid={0}). {0} is a special string used to
-    denote where the username fits into the filter.
-
-    If the LDAP server supports posixGroups, Hadoop can enable the feature by
-    setting the value of this property to "posixAccount" and the value of
-    the hadoop.security.group.mapping.ldap.search.filter.group property to
-    "posixGroup".
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.group</name>
-  <value>(objectClass=group)</value>
-  <description>
-    An additional filter to use when searching for LDAP groups. This should be
-    changed when resolving groups against a non-Active Directory installation.
-
-    See the description of hadoop.security.group.mapping.ldap.search.filter.user
-    to enable posixGroups support.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.member</name>
-  <value>member</value>
-  <description>
-    The attribute of the group object that identifies the users that are
-    members of the group. The default will usually be appropriate for
-    any LDAP installation.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.group.name</name>
-  <value>cn</value>
-  <description>
-    The attribute of the group object that identifies the group name. The
-    default will usually be appropriate for all LDAP systems.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.posix.attr.uid.name</name>
-  <value>uidNumber</value>
-  <description>
-    The attribute of posixAccount to use when groups for membership.
-    Mostly useful for schemas wherein groups have memberUids that use an
-    attribute other than uidNumber.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.posix.attr.gid.name</name>
-  <value>gidNumber</value>
-  <description>
-    The attribute of posixAccount indicating the group id.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.directory.search.timeout</name>
-  <value>10000</value>
-  <description>
-    The attribute applied to the LDAP SearchControl properties to set a
-    maximum time limit when searching and awaiting a result.
-    Set to 0 if infinite wait period is desired.
-    Default is 10 seconds. Units in milliseconds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers</name>
-  <value></value>
-  <description>
-    Comma separated of names of other providers to provide user to group
-    mapping. Used by CompositeGroupsMapping.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers.combined</name>
-  <value>true</value>
-  <description>
-    true or false to indicate whether groups from the providers are combined or
-    not. The default value is true. If true, then all the providers will be
-    tried to get groups and all the groups are combined to return as the final
-    results. Otherwise, providers are tried one by one in the configured list
-    order, and if any groups are retrieved from any provider, then the groups
-    will be returned without trying the left ones.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.service.user.name.key</name>
-  <value></value>
-  <description>
-    For those cases where the same RPC protocol is implemented by multiple
-    servers, this configuration is required for specifying the principal
-    name to use for the service when the client wishes to make an RPC call.
-  </description>
-</property>
-
-
-<property>
-    <name>hadoop.security.uid.cache.secs</name>
-    <value>14400</value>
-    <description>
-        This is the config controlling the validity of the entries in the cache
-        containing the userId to userName and groupId to groupName used by
-        NativeIO getFstat().
-    </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.protection</name>
-  <value>authentication</value>
-  <description>A comma-separated list of protection values for secured sasl
-      connections. Possible values are authentication, integrity and privacy.
-      authentication means authentication only and no integrity or privacy;
-      integrity implies authentication and integrity are enabled; and privacy
-      implies all of authentication, integrity and privacy are enabled.
-      hadoop.security.saslproperties.resolver.class can be used to override
-      the hadoop.rpc.protection for a connection at the server side.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.saslproperties.resolver.class</name>
-  <value></value>
-  <description>SaslPropertiesResolver used to resolve the QOP used for a
-      connection. If not specified, the full set of values specified in
-      hadoop.rpc.protection is used while determining the QOP used for the
-      connection. If a class is specified, then the QOP values returned by
-      the class will be used while determining the QOP used for the connection.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.sensitive-config-keys</name>
-  <value>secret$,password$,ssl.keystore.pass$,fs.s3.*[Ss]ecret.?[Kk]ey,fs.azure.account.key.*,dfs.webhdfs.oauth2.[a-z]+.token,hadoop.security.sensitive-config-keys</value>
-  <description>A comma-separated list of regular expressions to match against
-      configuration keys that should be redacted where appropriate, for
-      example, when logging modified properties during a reconfiguration,
-      private credentials should not be logged.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.workaround.non.threadsafe.getpwuid</name>
-  <value>true</value>
-  <description>Some operating systems or authentication modules are known to
-  have broken implementations of getpwuid_r and getpwgid_r, such that these
-  calls are not thread-safe. Symptoms of this problem include JVM crashes
-  with a stack trace inside these functions. If your system exhibits this
-  issue, enable this configuration parameter to include a lock around the
-  calls as a workaround.
-
-  An incomplete list of some systems known to have this issue is available
-  at http://wiki.apache.org/hadoop/KnownBrokenPwuidImplementations
-  </description>
-</property>
-
-<property>
-  <name>hadoop.kerberos.kinit.command</name>
-  <value>kinit</value>
-  <description>Used to periodically renew Kerberos credentials when provided
-  to Hadoop. The default setting assumes that kinit is in the PATH of users
-  running the Hadoop client. Change this to the absolute path to kinit if this
-  is not the case.
-  </description>
-</property>
-
-<property>
-    <name>hadoop.kerberos.min.seconds.before.relogin</name>
-    <value>60</value>
-    <description>The minimum time between relogin attempts for Kerberos, in
-    seconds.
-    </description>
-</property>
-
-<property>
-  <name>hadoop.security.auth_to_local</name>
-  <value></value>
-  <description>Maps kerberos principals to local user names</description>
-</property>
-
-<!-- i/o properties -->
-<property>
-  <name>io.file.buffer.size</name>
-  <value>4096</value>
-  <description>The size of buffer for use in sequence files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>io.bytes.per.checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  io.file.buffer.size.</description>
-</property>
-
-<property>
-  <name>io.skip.checksum.errors</name>
-  <value>false</value>
-  <description>If true, when a checksum error is encountered while
-  reading a sequence file, entries are skipped, instead of throwing an
-  exception.</description>
-</property>
-
-<property>
-  <name>io.compression.codecs</name>
-  <value></value>
-  <description>A comma-separated list of the compression codec classes that can
-  be used for compression/decompression. In addition to any classes specified
-  with this property (which take precedence), codec classes on the classpath
-  are discovered using a Java ServiceLoader.</description>
-</property>
-
-<property>
-  <name>io.compression.codec.bzip2.library</name>
-  <value>system-native</value>
-  <description>The native-code library to be used for compression and
-  decompression by the bzip2 codec.  This library could be specified
-  either by by name or the full pathname.  In the former case, the
-  library is located by the dynamic linker, usually searching the
-  directories specified in the environment variable LD_LIBRARY_PATH.
-
-  The value of "system-native" indicates that the default system
-  library should be used.  To indicate that the algorithm should
-  operate entirely in Java, specify "java-builtin".</description>
-</property>
-
-<property>
-  <name>io.serializations</name>
-  <value>org.apache.hadoop.io.serializer.WritableSerialization, org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization, org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
-  <description>A list of serialization classes that can be used for
-  obtaining serializers and deserializers.</description>
-</property>
-
-<property>
-  <name>io.seqfile.local.dir</name>
-  <value>${hadoop.tmp.dir}/io/local</value>
-  <description>The local directory where sequence file stores intermediate
-  data files during merge.  May be a comma-separated list of
-  directories on different devices in order to spread disk i/o.
-  Directories that do not exist are ignored.
-  </description>
-</property>
-
-<property>
-  <name>io.map.index.skip</name>
-  <value>0</value>
-  <description>Number of index entries to skip between each entry.
-  Zero by default. Setting this to values larger than zero can
-  facilitate opening large MapFiles using less memory.</description>
-</property>
-
-<property>
-  <name>io.map.index.interval</name>
-  <value>128</value>
-  <description>
-    MapFile consist of two files - data file (tuples) and index file
-    (keys). For every io.map.index.interval records written in the
-    data file, an entry (record-key, data-file-position) is written
-    in the index file. This is to allow for doing binary search later
-    within the index file to look up records by their keys and get their
-    closest positions in the data file.
-  </description>
-</property>
-
-<!-- file system properties -->
-
-<property>
-  <name>fs.defaultFS</name>
-  <value>file:///</value>
-  <description>The name of the default file system.  A URI whose
-  scheme and authority determine the FileSystem implementation.  The
-  uri's scheme determines the config property (fs.SCHEME.impl) naming
-  the FileSystem implementation class.  The uri's authority is used to
-  determine the host, port, etc. for a filesystem.</description>
-</property>
-
-<property>
-  <name>fs.default.name</name>
-  <value>file:///</value>
-  <description>Deprecated. Use (fs.defaultFS) property
-  instead</description>
-</property>
-
-<property>
-  <name>fs.trash.interval</name>
-  <value>0</value>
-  <description>Number of minutes after which the checkpoint
-  gets deleted.  If zero, the trash feature is disabled.
-  This option may be configured both on the server and the
-  client. If trash is disabled server side then the client
-  side configuration is checked. If trash is enabled on the
-  server side then the value configured on the server is
-  used and the client configuration value is ignored.
-  </description>
-</property>
-
-<property>
-  <name>fs.trash.checkpoint.interval</name>
-  <value>0</value>
-  <description>Number of minutes between trash checkpoints.
-  Should be smaller or equal to fs.trash.interval. If zero,
-  the value is set to the value of fs.trash.interval.
-  Every time the checkpointer runs it creates a new checkpoint
-  out of current and removes checkpoints created more than
-  fs.trash.interval minutes ago.
-  </description>
-</property>
-
-<property>
-  <name>fs.protected.directories</name>
-  <value></value>
-  <description>A comma-separated list of directories which cannot
-    be deleted even by the superuser unless they are empty. This
-    setting can be used to guard important system directories
-    against accidental deletion due to administrator error.
-  </description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.file.impl</name>
-  <value>org.apache.hadoop.fs.local.LocalFs</value>
-  <description>The AbstractFileSystem for file: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.har.impl</name>
-  <value>org.apache.hadoop.fs.HarFs</value>
-  <description>The AbstractFileSystem for har: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.hdfs.impl</name>
-  <value>org.apache.hadoop.fs.Hdfs</value>
-  <description>The FileSystem for hdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.viewfs.impl</name>
-  <value>org.apache.hadoop.fs.viewfs.ViewFs</value>
-  <description>The AbstractFileSystem for view file system for viewfs: uris
-  (ie client side mount table:).</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.ftp.impl</name>
-  <value>org.apache.hadoop.fs.ftp.FtpFs</value>
-  <description>The FileSystem for Ftp: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.webhdfs.impl</name>
-  <value>org.apache.hadoop.fs.WebHdfs</value>
-  <description>The FileSystem for webhdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.swebhdfs.impl</name>
-  <value>org.apache.hadoop.fs.SWebHdfs</value>
-  <description>The FileSystem for swebhdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.ftp.host</name>
-  <value>0.0.0.0</value>
-  <description>FTP filesystem connects to this server</description>
-</property>
-
-<property>
-  <name>fs.ftp.host.port</name>
-  <value>21</value>
-  <description>
-    FTP filesystem connects to fs.ftp.host on this port
-  </description>
-</property>
-
-<property>
-  <name>fs.df.interval</name>
-  <value>60000</value>
-  <description>Disk usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.du.interval</name>
-  <value>600000</value>
-  <description>File space usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.s3.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when writing files to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3</value>
-  <description>Determines where on the local filesystem the s3:/s3n: filesystem
-  should store files before sending them to S3
-  (or after retrieving them from S3).
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.maxRetries</name>
-  <value>4</value>
-  <description>The maximum number of retries for reading or writing files to S3,
-  before we signal failure to the application.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.sleepTimeSeconds</name>
-  <value>10</value>
-  <description>The number of seconds to sleep between each S3 retry.
-  </description>
-</property>
-
-<property>
-  <name>fs.automatic.close</name>
-  <value>true</value>
-  <description>By default, FileSystem instances are automatically closed at program
-  exit using a JVM shutdown hook. Setting this property to false disables this
-  behavior. This is an advanced option that should only be used by server applications
-  requiring a more carefully orchestrated shutdown sequence.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when reading files using the native S3
-  filesystem (s3n: URIs).</description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.enabled</name>
-  <value>false</value>
-  <description>Setting this property to true enables multiple uploads to
-  native S3 filesystem. When uploading a file, it is split into blocks
-  if the size is larger than fs.s3n.multipart.uploads.block.size.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.block.size</name>
-  <value>67108864</value>
-  <description>The block size for multipart uploads to native S3 filesystem.
-  Default size is 64MB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.copy.block.size</name>
-  <value>5368709120</value>
-  <description>The block size for multipart copy in native S3 filesystem.
-  Default size is 5GB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.server-side-encryption-algorithm</name>
-  <value></value>
-  <description>Specify a server-side encryption algorithm for S3.
-  Unset by default, and the only other currently allowable value is AES256.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.access.key</name>
-  <description>AWS access key ID used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.secret.key</name>
-  <description>AWS secret key used by S3A file system. Omit for IAM role-based or provider-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.aws.credentials.provider</name>
-  <description>
-	  Comma-separated class names of credential provider classes which implement
-	  com.amazonaws.auth.AWSCredentialsProvider.
-
-	  These are loaded and queried in sequence for a valid set of credentials.
-	  Each listed class must implement one of the following means of
-	  construction, which are attempted in order:
-	  1. a public constructor accepting java.net.URI and
-	  org.apache.hadoop.conf.Configuration,
-	  2. a public static method named getInstance that accepts no
-	  arguments and returns an instance of
-	  com.amazonaws.auth.AWSCredentialsProvider, or
-	  3. a public default constructor.
-
-	  Specifying
-	  org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider
-	  allows
-	  anonymous access to a publicly accessible S3 bucket without any credentials.
-	  Please note that allowing anonymous access to an S3 bucket compromises
-	  security and therefore is unsuitable for most use cases. It can be useful
-	  for accessing public data sets without requiring AWS credentials.
-
-	  If unspecified, then the default list of credential provider classes,
-	  queried in sequence, is:
-	  1. org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider:
-	  supports static
-	  configuration of AWS access key ID and secret access key. See also
-	  fs.s3a.access.key and fs.s3a.secret.key.
-	  2. com.amazonaws.auth.EnvironmentVariableCredentialsProvider: supports
-	  configuration of AWS access key ID and secret access key in
-	  environment variables named AWS_ACCESS_KEY_ID and
-	  AWS_SECRET_ACCESS_KEY, as documented in the AWS SDK.
-	  3.
-	  org.apache.hadoop.fs.s3a.SharedInstanceProfileCredentialsProvider:
-	  a shared instance of
-	  com.amazonaws.auth.InstanceProfileCredentialsProvider from the AWS
-	  SDK, which supports use of instance profile credentials if running
-	  in an EC2 VM. Using this shared instance potentially reduces load
-	  on the EC2 instance metadata service for multi-threaded
-	  applications.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.session.token</name>
-  <description>Session token, when using org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider
-    as one of the providers.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.security.credential.provider.path</name>
-  <value />
-  <description>
-    Optional comma separated list of credential providers, a list
-    which is prepended to that set in hadoop.security.credential.provider.path
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.maximum</name>
-  <value>15</value>
-  <description>Controls the maximum number of simultaneous connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.ssl.enabled</name>
-  <value>true</value>
-  <description>Enables or disables SSL connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.endpoint</name>
-  <description>AWS S3 endpoint to connect to. An up-to-date list is
-    provided in the AWS Documentation: regions and endpoints. Without this
-    property, the standard region (s3.amazonaws.com) is assumed.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.path.style.access</name>
-  <value>false</value>
-  <description>Enable S3 path style access ie disabling the default virtual hosting behaviour.
-    Useful for S3A-compliant storage providers as it removes the need to set up DNS for virtual hosting.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.host</name>
-  <description>Hostname of the (optional) proxy server for S3 connections.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.port</name>
-  <description>Proxy server port. If this property is not set
-    but fs.s3a.proxy.host is, port 80 or 443 is assumed (consistent with
-    the value of fs.s3a.connection.ssl.enabled).</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.username</name>
-  <description>Username for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.password</name>
-  <description>Password for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.domain</name>
-  <description>Domain for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.workstation</name>
-  <description>Workstation for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.attempts.maximum</name>
-  <value>20</value>
-  <description>How many times we should retry commands on transient errors.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.establish.timeout</name>
-  <value>5000</value>
-  <description>Socket connection setup timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.timeout</name>
-  <value>200000</value>
-  <description>Socket connection timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.socket.send.buffer</name>
-  <value>8192</value>
-  <description>Socket send buffer hint to amazon connector. Represented in bytes.</description>
-</property>
-
-<property>
-  <name>fs.s3a.socket.recv.buffer</name>
-  <value>8192</value>
-  <description>Socket receive buffer hint to amazon connector. Represented in bytes.</description>
-</property>
-
-<property>
-  <name>fs.s3a.paging.maximum</name>
-  <value>5000</value>
-  <description>How many keys to request from S3 when doing
-     directory listings at a time.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.max</name>
-  <value>10</value>
-  <description>The total number of threads available in the filesystem for data
-    uploads *or any other queued filesystem operation*.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.keepalivetime</name>
-  <value>60</value>
-  <description>Number of seconds a thread can be idle before being
-    terminated.</description>
-</property>
-
-<property>
-  <name>fs.s3a.max.total.tasks</name>
-  <value>5</value>
-  <description>The number of operations which can be queued for execution</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.size</name>
-  <value>100M</value>
-  <description>How big (in bytes) to split upload or copy operations up into.
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.threshold</name>
-  <value>2147483647</value>
-  <description>How big (in bytes) to split upload or copy operations up into.
-    This also controls the partition size in renamed files, as rename() involves
-    copying the source file(s).
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multiobjectdelete.enable</name>
-  <value>true</value>
-  <description>When enabled, multiple single-object delete requests are replaced by
-    a single 'delete multiple objects'-request, reducing the number of requests.
-    Beware: legacy S3-compatible object stores might not support this request.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.acl.default</name>
-  <description>Set a canned ACL for newly created and copied objects. Value may be Private,
-      PublicRead, PublicReadWrite, AuthenticatedRead, LogDeliveryWrite, BucketOwnerRead,
-      or BucketOwnerFullControl.</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge</name>
-  <value>false</value>
-  <description>True if you want to purge existing multipart uploads that may not have been
-    completed/aborted correctly. The corresponding purge age is defined in
-    fs.s3a.multipart.purge.age.
-    If set, when the filesystem is instantiated then all outstanding uploads
-    older than the purge age will be terminated -across the entire bucket.
-    This will impact multipart uploads by other applications and users. so should
-    be used sparingly, with an age value chosen to stop failed uploads, without
-    breaking ongoing operations.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge.age</name>
-  <value>86400</value>
-  <description>Minimum age in seconds of multipart uploads to purge.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.server-side-encryption-algorithm</name>
-  <description>Specify a server-side encryption algorithm for s3a: file system.
-    Unset by default, and the only other currently allowable value is AES256.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.signing-algorithm</name>
-  <description>Override the default signing algorithm so legacy
-    implementations can still be used</description>
-</property>
-
-<property>
-  <name>fs.s3a.block.size</name>
-  <value>32M</value>
-  <description>Block size to use when reading files using s3a: file system.
-    A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3a</value>
-  <description>Comma separated list of directories that will be used to buffer file
-    uploads to.</description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload</name>
-  <value>false</value>
-  <description>
-    Use the incremental block-based fast upload mechanism with
-    the buffering mechanism set in fs.s3a.fast.upload.buffer.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload.buffer</name>
-  <value>disk</value>
-  <description>
-    The buffering mechanism to use when using S3A fast upload
-    (fs.s3a.fast.upload=true). Values: disk, array, bytebuffer.
-    This configuration option has no effect if fs.s3a.fast.upload is false.
-
-    "disk" will use the directories listed in fs.s3a.buffer.dir as
-    the location(s) to save data prior to being uploaded.
-
-    "array" uses arrays in the JVM heap
-
-    "bytebuffer" uses off-heap memory within the JVM.
-
-    Both "array" and "bytebuffer" will consume memory in a single stream up to the number
-    of blocks set by:
-
-        fs.s3a.multipart.size * fs.s3a.fast.upload.active.blocks.
-
-    If using either of these mechanisms, keep this value low
-
-    The total number of threads performing work across all threads is set by
-    fs.s3a.threads.max, with fs.s3a.max.total.tasks values setting the number of queued
-    work items.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload.active.blocks</name>
-  <value>4</value>
-  <description>
-    Maximum Number of blocks a single output stream can have
-    active (uploading, or queued to the central FileSystem
-    instance's pool of queued operations.
-
-    This stops a single stream overloading the shared thread pool.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.readahead.range</name>
-  <value>64K</value>
-  <description>Bytes to read ahead during a seek() before closing and
-  re-opening the S3 HTTP connection. This option will be overridden if
-  any call to setReadahead() is made to an open stream.
-  A suffix from the set {K,M,G,T,P} may be used to scale the numeric value.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.user.agent.prefix</name>
-  <value></value>
-  <description>
-    Sets a custom value that will be prepended to the User-Agent header sent in
-    HTTP requests to the S3 back-end by S3AFileSystem.  The User-Agent header
-    always includes the Hadoop version number followed by a string generated by
-    the AWS SDK.  An example is "User-Agent: Hadoop 2.8.0, aws-sdk-java/1.10.6".
-    If this optional property is set, then its value is prepended to create a
-    customized User-Agent.  For example, if this configuration property was set
-    to "MyApp", then an example of the resulting User-Agent would be
-    "User-Agent: MyApp, Hadoop 2.8.0, aws-sdk-java/1.10.6".
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.impl</name>
-  <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-  <description>The implementation class of the S3A Filesystem</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.s3a.impl</name>
-  <value>org.apache.hadoop.fs.s3a.S3A</value>
-  <description>The implementation class of the S3A AbstractFileSystem.</description>
-</property>
-
-<property>
-  <name>io.seqfile.compress.blocksize</name>
-  <value>1000000</value>
-  <description>The minimum block size for compression in block compressed
-          SequenceFiles.
-  </description>
-</property>
-
- <property>
-  <name>io.mapfile.bloom.size</name>
-  <value>1048576</value>
-  <description>The size of BloomFilter-s used in BloomMapFile. Each time this many
-  keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
-  Larger values minimize the number of filters, which slightly increases the performance,
-  but may waste too much space if the total number of keys is usually much smaller
-  than this number.
-  </description>
-</property>
-
-<property>
-  <name>io.mapfile.bloom.error.rate</name>
-  <value>0.005</value>
-  <description>The rate of false positives in BloomFilter-s used in BloomMapFile.
-  As this value decreases, the size of BloomFilter-s increases exponentially. This
-  value is the probability of encountering false positives (default is 0.5%).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.util.hash.type</name>
-  <value>murmur</value>
-  <description>The default implementation of Hash. Currently this can take one of the
-  two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
-  </description>
-</property>
-
-
-<!-- ipc properties -->
-
-<property>
-  <name>ipc.client.idlethreshold</name>
-  <value>4000</value>
-  <description>Defines the threshold number of connections after which
-               connections will be inspected for idleness.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.kill.max</name>
-  <value>10</value>
-  <description>Defines the maximum number of clients to disconnect in one go.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connection.maxidletime</name>
-  <value>10000</value>
-  <description>The maximum time in msec after which a client will bring down the
-               connection to the server.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries</name>
-  <value>10</value>
-  <description>Indicates the number of retries a client will make to establish
-               a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.retry.interval</name>
-  <value>1000</value>
-  <description>Indicates the number of milliseconds a client will wait for
-    before retrying to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.timeout</name>
-  <value>20000</value>
-  <description>Indicates the number of milliseconds a client will wait for the
-               socket to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries.on.timeouts</name>
-  <value>45</value>
-  <description>Indicates the number of retries a client will make on socket timeout
-               to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.tcpnodelay</name>
-  <value>true</value>
-  <description>Use TCP_NODELAY flag to bypass Nagle's algorithm transmission delays.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.low-latency</name>
-  <value>false</value>
-  <description>Use low-latency QoS markers for IPC connections.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.ping</name>
-  <value>true</value>
-  <description>Send a ping to the server when timeout on reading the response,
-  if set to true. If no failure is detected, the client retries until at least
-  a byte is read or the time given by ipc.client.rpc-timeout.ms is passed.
-  </description>
-</property>
-
-<property>
-  <name>ipc.ping.interval</name>
-  <value>60000</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  The client will send ping when the interval is passed without receiving bytes,
-  if ipc.client.ping is set to true.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.rpc-timeout.ms</name>
-  <value>0</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  If ipc.client.ping is set to true and this rpc-timeout is greater than
-  the value of ipc.ping.interval, the effective value of the rpc-timeout is
-  rounded up to multiple of ipc.ping.interval.
-  </description>
-</property>
-
-<property>
-  <name>ipc.server.listen.queue.size</name>
-  <value>128</value>
-  <description>Indicates the length of the listen queue for servers accepting
-               client connections.
-  </description>
-</property>
-
-<property>
-    <name>ipc.server.log.slow.rpc</name>
-    <value>false</value>
-    <description>This setting is useful to troubleshoot performance issues for
-     various services. If this value is set to true then we log requests that
-     fall into 99th percentile as well as increment RpcSlowCalls counter.
-    </description>
-</property>
-
-<property>
-  <name>ipc.maximum.data.length</name>
-  <value>67108864</value>
-  <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the server. Messages larger than this value are rejected by the
-    immediately to avoid possible OOMs. This setting should rarely need to be
-    changed.
-  </description>
-</property>
-
-<property>
-  <name>ipc.maximum.response.length</name>
-  <value>134217728</value>
-  <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the client. Messages larger than this value are rejected
-    immediately to avoid possible OOMs. This setting should rarely need to be
-    changed.  Set to 0 to disable.
-  </description>
-</property>
-
-<!-- Proxy Configuration -->
-
-<property>
-  <name>hadoop.security.impersonation.provider.class</name>
-  <value></value>
-  <description>A class which implements ImpersonationProvider interface, used to
-       authorize whether one user can impersonate a specific user.
-       If not specified, the DefaultImpersonationProvider will be used.
-       If a class is specified, then that class will be used to determine
-       the impersonation capability.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.default</name>
-  <value>org.apache.hadoop.net.StandardSocketFactory</value>
-  <description> Default SocketFactory to use. This parameter is expected to be
-    formatted as "package.FactoryClassName".
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
-  <value></value>
-  <description> SocketFactory to use to connect to a DFS. If null or empty, use
-    hadoop.rpc.socket.class.default. This socket factory is also used by
-    DFSClient to create sockets to DataNodes.
-  </description>
-</property>
-
-
-
-<property>
-  <name>hadoop.socks.server</name>
-  <value></value>
-  <description> Address (host:port) of the SOCKS server to be used by the
-    SocksSocketFactory.
-  </description>
-</property>
-
-<!-- Topology Configuration -->
-<property>
-  <name>net.topology.node.switch.mapping.impl</name>
-  <value>org.apache.hadoop.net.ScriptBasedMapping</value>
-  <description> The default implementation of the DNSToSwitchMapping. It
-    invokes a script specified in net.topology.script.file.name to resolve
-    node names. If the value for net.topology.script.file.name is not set, the
-    default value of DEFAULT_RACK is returned for all node names.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.impl</name>
-  <value>org.apache.hadoop.net.NetworkTopology</value>
-  <description> The default implementation of NetworkTopology which is classic three layer one.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.file.name</name>
-  <value></value>
-  <description> The script name that should be invoked to resolve DNS names to
-    NetworkTopology names. Example: the script would take host.foo.bar as an
-    argument, and return /rack1 as the output.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.number.args</name>
-  <value>100</value>
-  <description> The max number of args that the script configured with
-    net.topology.script.file.name should be run with. Each arg is an
-    IP address.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.table.file.name</name>
-  <value></value>
-  <description> The file name for a topology file, which is used when the
-    net.topology.node.switch.mapping.impl property is set to
-    org.apache.hadoop.net.TableMapping. The file format is a two column text
-    file, with columns separated by whitespace. The first column is a DNS or
-    IP address and the second column specifies the rack where the address maps.
-    If no entry corresponding to a host in the cluster is found, then
-    /default-rack is assumed.
-  </description>
-</property>
-
-<!-- Local file system -->
-<property>
-  <name>file.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>file.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  file.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>file.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>file.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>file.replication</name>
-  <value>1</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3 File System -->
-
-<property>
-  <name>s3.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3native File System -->
-
-<property>
-  <name>s3native.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3native.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3native.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3native.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3native.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3native.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- FTP file system -->
-<property>
-  <name>ftp.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>ftp.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  ftp.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>ftp.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>ftp.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>ftp.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- Tfile -->
-
-<property>
-  <name>tfile.io.chunk.size</name>
-  <value>1048576</value>
-  <description>
-    Value chunk size in bytes. Default  to
-    1MB. Values of the length less than the chunk size is
-    guaranteed to have known value length in read time (See also
-    TFile.Reader.Scanner.Entry.isValueLengthKnown()).
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.output.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataOutputStream in bytes.
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.input.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataInputStream in bytes.
-  </description>
-</property>
-
-<!-- HTTP web-consoles Authentication -->
-
-<property>
-  <name>hadoop.http.authentication.type</name>
-  <value>simple</value>
-  <description>
-    Defines authentication used for Oozie HTTP endpoint.
-    Supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.token.validity</name>
-  <value>36000</value>
-  <description>
-    Indicates how long (in seconds) an authentication token is valid before it has
-    to be renewed.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.signature.secret.file</name>
-  <value>${user.home}/hadoop-http-auth-signature-secret</value>
-  <description>
-    The signature secret for signing the authentication tokens.
-    The same secret should be used for JT/NN/DN/TT configurations.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.cookie.domain</name>
-  <value></value>
-  <description>
-    The domain to use for the HTTP cookie that stores the authentication token.
-    In order to authentiation to work correctly across all Hadoop nodes web-consoles
-    the domain must be correctly set.
-    IMPORTANT: when using IP addresses, browsers ignore cookies with domain settings.
-    For this setting to work properly all nodes in the cluster must be configured
-    to generate URLs with hostname.domain names on it.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.simple.anonymous.allowed</name>
-  <value>true</value>
-  <description>
-    Indicates if anonymous requests are allowed when using 'simple' authentication.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.principal</name>
-  <value>HTTP/_HOST@LOCALHOST</value>
-  <description>
-    Indicates the Kerberos principal to be used for HTTP endpoint.
-    The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.keytab</name>
-  <value>${user.home}/hadoop.keytab</value>
-  <description>
-    Location of the keytab file with the credentials for the principal.
-    Referring to the same keytab file Oozie uses for its Kerberos credentials for Hadoop.
-  </description>
-</property>
-
-<!-- HTTP CORS support -->
-<property>
-  <description>Enable/disable the cross-origin (CORS) filter.</description>
-  <name>hadoop.http.cross-origin.enabled</name>
-  <value>false</value>
-</property>
-
-<property>
-  <description>Comma separated list of origins that are allowed for web
-    services needing cross-origin (CORS) support. Wildcards (*) and patterns
-    allowed</description>
-  <name>hadoop.http.cross-origin.allowed-origins</name>
-  <value>*</value>
-</property>
-
-<property>
-  <description>Comma separated list of methods that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-methods</name>
-  <value>GET,POST,HEAD</value>
-</property>
-
-<property>
-  <description>Comma separated list of headers that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-headers</name>
-  <value>X-Requested-With,Content-Type,Accept,Origin</value>
-</property>
-
-<property>
-  <description>The number of seconds a pre-flighted request can be cached
-    for web services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.max-age</name>
-  <value>1800</value>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.methods</name>
-  <value></value>
-  <description>
-    List of fencing methods to use for service fencing. May contain
-    builtin methods (eg shell and sshfence) or user-defined method.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.connect-timeout</name>
-  <value>30000</value>
-  <description>
-    SSH connection timeout, in milliseconds, to use with the builtin
-    sshfence fencer.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.private-key-files</name>
-  <value></value>
-  <description>
-    The SSH private key files to use with the builtin sshfence fencer.
-  </description>
-</property>
-
-
-<!-- Static Web User Filter properties. -->
-<property>
-  <description>
-    The user name to filter as, on static web filters
-    while rendering content. An example use is the HDFS
-    web UI (user to be used for browsing files).
-  </description>
-  <name>hadoop.http.staticuser.user</name>
-  <value>dr.who</value>
-</property>
-
-<property>
-  <name>ha.zookeeper.quorum</name>
-  <description>
-    A list of ZooKeeper server addresses, separated by commas, that are
-    to be used by the ZKFailoverController in automatic failover.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.session-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    The session timeout to use when the ZKFC connects to ZooKeeper.
-    Setting this value to a lower value implies that server crashes
-    will be detected more quickly, but risks triggering failover too
-    aggressively in the case of a transient error or network blip.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.parent-znode</name>
-  <value>/hadoop-ha</value>
-  <description>
-    The ZooKeeper znode under which the ZK failover controller stores
-    its information. Note that the nameservice ID is automatically
-    appended to this znode, so it is not normally necessary to
-    configure this, even in a federated environment.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.acl</name>
-  <value>world:anyone:rwcda</value>
-  <description>
-    A comma-separated list of ZooKeeper ACLs to apply to the znodes
-    used by automatic failover. These ACLs are specified in the same
-    format as used by the ZooKeeper CLI.
-
-    If the ACL itself contains secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.auth</name>
-  <value></value>
-  <description>
-    A comma-separated list of ZooKeeper authentications to add when
-    connecting to ZooKeeper. These are specified in the same format
-    as used by the &quot;addauth&quot; command in the ZK CLI. It is
-    important that the authentications specified here are sufficient
-    to access znodes with the ACL specified in ha.zookeeper.acl.
-
-    If the auths contain secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<!-- SSLFactory configuration -->
-
-<property>
-  <name>hadoop.ssl.keystores.factory.class</name>
-  <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
-  <description>
-    The keystores factory to use for retrieving certificates.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.require.client.cert</name>
-  <value>false</value>
-  <description>Whether client certificates are required</description>
-</property>
-
-<property>
-  <name>hadoop.ssl.hostname.verifier</name>
-  <value>DEFAULT</value>
-  <description>
-    The hostname verifier to provide for HttpsURLConnections.
-    Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
-    ALLOW_ALL
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.server.conf</name>
-  <value>ssl-server.xml</value>
-  <description>
-    Resource file from which ssl server keystore information will be extracted.
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.client.conf</name>
-  <value>ssl-client.xml</value>
-  <description>
-    Resource file from which ssl client keystore information will be extracted
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled</name>
-  <value>false</value>
-  <description>
-    Deprecated. Use dfs.http.policy and yarn.http.policy instead.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled.protocols</name>
-  <value>TLSv1</value>
-  <description>
-    Protocols supported by the ssl.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.jetty.logs.serve.aliases</name>
-  <value>true</value>
-  <description>
-    Enable/Disable aliases serving from jetty
-  </description>
-</property>
-
-<property>
-  <name>fs.permissions.umask-mode</name>
-  <value>022</value>
-  <description>
-    The umask used when creating files and directories.
-    Can be in octal or in symbolic. Examples are:
-    "022" (octal for u=rwx,g=r-x,o=r-x in symbolic),
-    or "u=rwx,g=rwx,o=" (symbolic for 007 in octal).
-  </description>
-</property>
-
-<!-- ha properties -->
-
-<property>
-  <name>ha.health-monitor.connect-retry-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to retry connecting to the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.check-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to check the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.sleep-after-disconnect.ms</name>
-  <value>1000</value>
-  <description>
-    How long to sleep after an unexpected RPC error.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.rpc-timeout.ms</name>
-  <value>45000</value>
-  <description>
-    Timeout for the actual monitorHealth() calls.
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.new-active.rpc-timeout.ms</name>
-  <value>60000</value>
-  <description>
-    Timeout that the FC waits for the new active to become active
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.rpc-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    Timeout that the FC waits for the old active to go to standby
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.connection.retries</name>
-  <value>1</value>
-  <description>
-    FC connection retries for graceful fencing
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
-  <value>20000</value>
-  <description>
-    Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.fallback-to-simple-auth-allowed</name>
-  <value>false</value>
-  <description>
-    When a client is configured to attempt a secure connection, but attempts to
-    connect to an insecure server, that server may instruct the client to
-    switch to SASL SIMPLE (unsecure) authentication. This setting controls
-    whether or not the client will accept this instruction from the server.
-    When false (the default), the client will not allow the fallback to SIMPLE
-    authentication, and will abort the connection.
-  </description>
-</property>
-
-<property>
-  <name>fs.client.resolve.remote.symlinks</name>
-  <value>true</value>
-  <description>
-      Whether to resolve symlinks when accessing a remote Hadoop filesystem.
-      Setting this to false causes an exception to be thrown upon encountering
-      a symlink. This setting does not apply to local filesystems, which
-      automatically resolve local symlinks.
-  </description>
-</property>
-
-<property>
-  <name>nfs.exports.allowed.hosts</name>
-  <value>* rw</value>
-  <description>
-    By default, the export can be mounted by any client. The value string
-    contains machine name and access privilege, separated by whitespace
-    characters. The machine name format can be a single host, a Java regular
-    expression, or an IPv4 address. The access privilege uses rw or ro to
-    specify read/write or read-only access of the machines to exports. If the
-    access privilege is not provided, the default is read-only. Entries are separated by ";".
-    For example: "192.168.0.0/22 rw ; host.*\.example\.com ; host1.test.org ro;".
-    Only the NFS gateway needs to restart after this property is updated.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.user.group.static.mapping.overrides</name>
-  <value>dr.who=;</value>
-  <description>
-    Static mapping of user to groups. This will override the groups if
-    available in the system for the specified user. In otherwords, groups
-    look-up will not happen for these users, instead groups mapped in this
-    configuration will be used.
-    Mapping should be in this format.
-    user1=group1,group2;user2=;user3=group2;
-    Default, "dr.who=;" will consider "dr.who" as user without groups.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.quantile.enable</name>
-  <value>false</value>
-  <description>
-    Setting this property to true and rpc.metrics.percentiles.intervals
-    to a comma-separated list of the granularity in seconds, the
-    50/75/90/95/99th percentile latency for rpc queue/processing time in
-    milliseconds are added to rpc metrics.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.percentiles.intervals</name>
-  <value></value>
-  <description>
-    A comma-separated list of the granularity in seconds for the metrics which
-    describe the 50/75/90/95/99th percentile latency for rpc queue/processing
-    time. The metrics are outputted if rpc.metrics.quantile.enable is set to
-    true.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE</name>
-  <value></value>
-  <description>
-    The prefix for a given crypto codec, contains a comma-separated
-    list of implementation classes for a given crypto codec (eg EXAMPLECIPHERSUITE).
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.aes.ctr.nopadding</name>
-  <value>org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec, org.apache.hadoop.crypto.JceAesCtrCryptoCodec</value>
-  <description>
-    Comma-separated list of crypto codec implementations for AES/CTR/NoPadding.
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.cipher.suite</name>
-  <value>AES/CTR/NoPadding</value>
-  <description>
-    Cipher suite for crypto codec.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.jce.provider</name>
-  <value></value>
-  <description>
-    The JCE provider name used in CryptoCodec.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.buffer.size</name>
-  <value>8192</value>
-  <description>
-    The buffer size used by CryptoInputStream and CryptoOutputStream.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.java.secure.random.algorithm</name>
-  <value>SHA1PRNG</value>
-  <description>
-    The java secure random algorithm.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.secure.random.impl</name>
-  <value></value>
-  <description>
-    Implementation of secure random.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.random.device.file.path</name>
-  <value>/dev/urandom</value>
-  <description>
-    OS security random device file path.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.key.provider.path</name>
-  <description>
-    The KeyProvider to use when managing zone keys, and interacting with
-    encryption keys when reading and writing to an encryption zone.
-  </description>
-</property>
-
-<property>
-  <name>fs.har.impl.disable.cache</name>
-  <value>true</value>
-  <description>Don't cache 'har' filesystem instances.</description>
-</property>
-
-<!--- KMSClientProvider configurations -->
-<property>
-  <name>hadoop.security.kms.client.authentication.retry-count</name>
-  <value>1</value>
-  <description>
-    Number of time to retry connecting to KMS on authentication failure
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.size</name>
-  <value>500</value>
-  <description>
-    Size of the EncryptedKeyVersion cache Queue for each key
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.low-watermark</name>
-  <value>0.3f</value>
-  <description>
-    If size of the EncryptedKeyVersion cache Queue falls below the
-    low watermark, this cache queue will be scheduled for a refill
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.num.refill.threads</name>
-  <value>2</value>
-  <description>
-    Number of threads to use for refilling depleted EncryptedKeyVersion
-    cache Queues
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.expiry</name>
-  <value>43200000</value>
-  <description>
-    Cache expiry time for a Key, after which the cache Queue for this
-    key will be dropped. Default = 12hrs
-  </description>
-</property>
-
- <property>
-  <name>ipc.server.max.connections</name>
-  <value>0</value>
-  <description>The maximum number of concurrent connections a server is allowed
-    to accept. If this limit is exceeded, incoming connections will first fill
-    the listen queue and then may go to an OS-specific listen overflow queue.
-    The client may fail or timeout, but the server can avoid running out of file
-    descriptors using this feature. 0 means no limit.
-  </description>
-</property>
-
-
-  <!-- YARN registry -->
-
-  <property>
-    <description>
-      Is the registry enabled in the YARN Resource Manager?
-
-      If true, the YARN RM will, as needed.
-      create the user and system paths, and purge
-      service records when containers, application attempts
-      and applications complete.
-
-      If false, the paths must be created by other means,
-      and no automatic cleanup of service records will take place.
-    </description>
-    <name>hadoop.registry.rm.enabled</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      The root zookeeper node for the registry
-    </description>
-    <name>hadoop.registry.zk.root</name>
-    <value>/registry</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper session timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.session.timeout.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.connection.timeout.ms</name>
-    <value>15000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection retry count before failing
-    </description>
-    <name>hadoop.registry.zk.retry.times</name>
-    <value>5</value>
-  </property>
-
-  <property>
-    <description>
-    </description>
-    <name>hadoop.registry.zk.retry.interval.ms</name>
-    <value>1000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper retry limit in milliseconds, during
-      exponential backoff.
-
-      This places a limit even
-      if the retry times and interval limit, combined
-      with the backoff policy, result in a long retry
-      period
-    </description>
-    <name>hadoop.registry.zk.retry.ceiling.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      List of hostname:port pairs defining the
-      zookeeper quorum binding for the registry
-    </description>
-    <name>hadoop.registry.zk.quorum</name>
-    <value>localhost:2181</value>
-  </property>
-
-  <property>
-    <description>
-      Key to set if the registry is secure. Turning it on
-      changes the permissions policy from "open access"
-      to restrictions on kerberos with the option of
-      a user adding one or more auth key pairs down their
-      own tree.
-    </description>
-    <name>hadoop.registry.secure</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      A comma separated list of Zookeeper ACL identifiers with
-      system access to the registry in a secure cluster.
-
-      These are given full access to all entries.
-
-      If there is an "@" at the end of a SASL entry it
-      instructs the registry client to append the default kerberos domain.
-    </description>
-    <name>hadoop.registry.system.acls</name>
-    <value>sasl:yarn@, sasl:mapred@, sasl:hdfs@</value>
-  </property>
-
-  <property>
-    <description>
-      The kerberos realm: used to set the realm of
-      system principals which do not declare their realm,
-      and any other accounts that need the value.
-
-      If empty, the default realm of the running process
-      is used.
-
-      If neither are known and the realm is needed, then the registry
-      service/client will fail.
-    </description>
-    <name>hadoop.registry.kerberos.realm</name>
-    <value></value>
-  </property>
-
-  <property>
-    <description>
-      Key to define the JAAS context. Used in secure
-      mode
-    </description>
-    <name>hadoop.registry.jaas.context</name>
-    <value>Client</value>
-  </property>
-
-  <property>
-    <description>
-      Enable hdfs shell commands to display warnings if (fs.defaultFS) property
-      is not set.
-    </description>
-    <name>hadoop.shell.missing.defaultFs.warning</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <name>hadoop.shell.safely.delete.limit.num.files</name>
-    <value>100</value>
-    <description>Used by -safely option of hadoop fs shell -rm command to avoid
-      accidental deletion of large directories. When enabled, the -rm command
-      requires confirmation if the number of files to be deleted is greater than
-      this limit.  The default limit is 100 files. The warning is disabled if
-      the limit is 0 or the -safely is not specified in -rm command.
-    </description>
-  </property>
-
-  <property>
-    <name>fs.client.htrace.sampler.classes</name>
-    <value></value>
-    <description>The class names of the HTrace Samplers to use for Hadoop
-      filesystem clients.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.htrace.span.receiver.classes</name>
-    <value></value>
-    <description>The class names of the Span Receivers to use for Hadoop.
-    </description>
-  </property>
-
-  <property>
-    <name>hadoop.caller.context.enabled</name>
-    <value>false</value>
-    <description>When the feature is enabled, additional fields are written into
-      name-node audit log records for auditing coarse granularity operations.
-    </description>
-  </property>
-  <property>
-    <name>hadoop.caller.context.max.size</name>
-    <value>128</value>
-    <description>The maximum bytes a caller context string can have. If the
-      passed caller context is longer than this maximum bytes, client will
-      truncate it before sending to server. Note that the server may have a
-      different maximum size, and will truncate the caller context to the
-      maximum size it allows.
-    </description>
-  </property>
-  <property>
-    <name>hadoop.caller.context.signature.max.size</name>
-    <value>40</value>
-    <description>
-      The caller's signature (optional) is for offline validation. If the
-      signature exceeds the maximum allowed bytes in server, the caller context
-      will be abandoned, in which case the caller context will not be recorded
-      in audit logs.
-    </description>
-  </property>
-
-</configuration>