You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/09/19 07:16:12 UTC

[flink] 04/06: [FLINK-10366] [s3] Adjust Presto-based S3 adapter to use common S3 base

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c4983e7b094358538c53c088dac830d86231973
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Sep 13 20:49:15 2018 +0200

    [FLINK-10366] [s3] Adjust Presto-based S3 adapter to use common S3 base
---
 flink-filesystems/flink-s3-fs-presto/pom.xml       |  118 +-
 .../flink/fs/s3presto/S3FileSystemFactory.java     |    8 +-
 .../java/org/apache/hadoop/conf/Configuration.java | 2951 --------------------
 .../org/apache/hadoop/util/NativeCodeLoader.java   |   94 -
 .../src/main/resources/META-INF/NOTICE             |   82 -
 .../src/main/resources/core-default-shaded.xml     | 1978 -------------
 6 files changed, 43 insertions(+), 5188 deletions(-)

diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml
index 8720217..cf35a04 100644
--- a/flink-filesystems/flink-s3-fs-presto/pom.xml
+++ b/flink-filesystems/flink-s3-fs-presto/pom.xml
@@ -33,14 +33,12 @@ under the License.
 	<packaging>jar</packaging>
 
 	<properties>
-		<!-- Do not change this without updating the copied Configuration class from the included Hadoop! -->
 		<presto.version>0.185</presto.version>
 	</properties>
 
 	<dependencies>
 
-		<!-- Flink core -->
-
+		<!-- Flink's file system abstraction (compiled against, not bundled) -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-core</artifactId>
@@ -48,30 +46,30 @@ under the License.
 			<scope>provided</scope>
 		</dependency>
 
-		<!-- File system builds on the Hadoop file system support -->
-
+		<!-- S3 base (bundled) -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-hadoop-fs</artifactId>
+			<artifactId>flink-s3-fs-base</artifactId>
 			<version>${project.version}</version>
-
-			<!-- this exclusion is only needed to run tests in the IDE, pre shading,
-				because the optional Hadoop dependency is also pulled in for tests -->
-			<exclusions>
-				<exclusion>
-					<groupId>org.apache.flink</groupId>
-					<artifactId>flink-shaded-hadoop2</artifactId>
-				</exclusion>
-			</exclusions>
 		</dependency>
 
-		<!-- Presto's S3 file system -->
-		
+		<!-- Presto's S3 file system (bundled) -->
 		<dependency>
 			<groupId>com.facebook.presto</groupId>
 			<artifactId>presto-hive</artifactId>
 			<version>${presto.version}</version>
 			<exclusions>
+				<!-- use our AWS dependencies instead -->
+				<exclusion>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-core</artifactId>
+				</exclusion>
+				<exclusion>
+					<groupId>com.amazonaws</groupId>
+					<artifactId>aws-java-sdk-s3</artifactId>
+				</exclusion>
+
+				<!-- lot's of unneeded stuff for the S3 file system -->
 				<exclusion>
 					<groupId>com.facebook.hive</groupId>
 					<artifactId>hive-dwrf</artifactId>
@@ -194,7 +192,6 @@ under the License.
 		</dependency>
 	</dependencies>
 
-
 	<build>
 		<plugins>
 			<plugin>
@@ -218,10 +215,17 @@ under the License.
 								</excludes>
 							</artifactSet>
 							<relocations>
+								<!-- relocate the references to Hadoop to match the pre-shaded hadoop artifact -->
+								<relocation>
+									<pattern>org.apache.hadoop</pattern>
+									<shadedPattern>org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop</shadedPattern>
+								</relocation>
+								<!-- relocate the AWS dependencies -->
 								<relocation>
-									<pattern>com.amazonaws</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.amazonaws</shadedPattern>
+									<pattern>com.amazon</pattern>
+									<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
 								</relocation>
+								<!-- relocate S3 presto and dependencies -->
 								<relocation>
 									<pattern>com.facebook</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.facebook</shadedPattern>
@@ -231,40 +235,9 @@ under the License.
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.fasterxml</shadedPattern>
 								</relocation>
 								<relocation>
-									<pattern>com.google</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
-									<excludes>
-										<!-- provided -->
-										<exclude>com.google.code.findbugs.**</exclude>
-									</excludes>
-								</relocation>
-								<relocation>
 									<pattern>io.airlift</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.io.airlift</shadedPattern>
 								</relocation>
-
-								<!-- relocate everything from the flink-hadoop-fs project -->
-								<relocation>
-									<pattern>org.apache.flink.runtime.fs.hdfs</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.fs.hdfs</shadedPattern>
-								</relocation>
-								<relocation>
-									<pattern>org.apache.flink.runtime.util</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.apache.flink.runtime.util</shadedPattern>
-									<includes>
-										<include>org.apache.flink.runtime.util.**Hadoop*</include>
-									</includes>
-								</relocation>
-
-								<relocation>
-									<pattern>org.apache</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.apache</shadedPattern>
-									<excludes>
-										<!-- keep all other classes of flink as they are (exceptions above) -->
-										<exclude>org.apache.flink.**</exclude>
-										<exclude>org.apache.log4j.**</exclude> <!-- provided -->
-									</excludes>
-								</relocation>
 								<relocation>
 									<pattern>org.HdrHistogram</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.HdrHistogram</shadedPattern>
@@ -274,47 +247,36 @@ under the License.
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.joda</shadedPattern>
 								</relocation>
 								<relocation>
-									<pattern>org.openjdk</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.openjdk</shadedPattern>
-								</relocation>
-								<relocation>
 									<pattern>org.weakref</pattern>
 									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.weakref</shadedPattern>
 								</relocation>
 								<relocation>
-									<pattern>software.amazon</pattern>
-									<shadedPattern>org.apache.flink.fs.s3presto.shaded.software.amazon</shadedPattern>
+									<pattern>org.openjdk</pattern>
+									<shadedPattern>org.apache.flink.fs.s3presto.shaded.org.openjdk</shadedPattern>
+								</relocation>
+								<relocation>
+									<pattern>com.google</pattern>
+									<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
 								</relocation>
 							</relocations>
 							<filters>
 								<filter>
 									<artifact>*</artifact>
 									<excludes>
-										<exclude>log4j.properties</exclude>
-										<exclude>core-default.xml</exclude>
-										<exclude>core-site.xml</exclude>
-										<exclude>hdfs-default.xml</exclude>
-										<exclude>mapred-default.xml</exclude>
-										<exclude>mime.types</exclude>
-										<exclude>mozilla/**</exclude>
-										<exclude>nativelib/**</exclude>
-										<exclude>META-INF/native/**</exclude>
+										<exclude>META-INF/maven/org.weakref/**</exclude>
+										<exclude>META-INF/maven/org.hdrhistogram/**</exclude>
+										<exclude>META-INF/maven/joda-time/**</exclude>
+										<exclude>META-INF/maven/io.airlift/**</exclude>
 										<exclude>META-INF/maven/com*/**</exclude>
-										<exclude>META-INF/maven/io*/**</exclude>
-										<exclude>META-INF/maven/org.w*/**</exclude>
-										<exclude>META-INF/maven/org.h*/**</exclude>
-										<exclude>META-INF/maven/software*/**</exclude>
-										<exclude>META-INF/maven/joda*/**</exclude>
-										<exclude>META-INF/maven/org.mortbay.jetty/**</exclude>
-										<exclude>META-INF/maven/org.apache.h*/**</exclude>
-										<exclude>META-INF/maven/org.apache.flink/flink-hadoop-fs/**</exclude>
 										<exclude>META-INF/maven/org.apache.flink/force-shading/**</exclude>
-										<!-- we use our own "shaded" core-default.xml: core-default-shaded.xml -->
-										<exclude>core-default.xml</exclude>
-										<!-- we only add a core-site.xml with unshaded classnames for the unit tests -->
-										<exclude>core-site.xml</exclude>
 									</excludes>
 								</filter>
+								<filter>
+									<artifact>com.facebook.presto.hadoop:hadoop-apache2</artifact>
+									<includes>
+										<include>com/facebook/presto/hadoop/**</include>
+									</includes>
+								</filter>
 							</filters>
 						</configuration>
 					</execution>
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
index a04f9c9..d4ad561 100644
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
+++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java
@@ -28,18 +28,16 @@ import com.facebook.presto.hive.PrestoS3FileSystem;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
  * Simple factory for the S3 file system.
  */
 public class S3FileSystemFactory extends AbstractFileSystemFactory {
-	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE =
-		new HashSet<>(Collections.singletonList("com.amazonaws."));
 
-	private static final Set<String> CONFIG_KEYS_TO_SHADE =
-		Collections.unmodifiableSet(new HashSet<>(Collections.singleton("presto.s3.credentials-provider")));
+	private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.singleton("com.amazonaws.");
+
+	private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.singleton("presto.s3.credentials-provider");
 
 	private static final String FLINK_SHADING_PREFIX = "org.apache.flink.fs.s3presto.shaded.";
 
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/conf/Configuration.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/conf/Configuration.java
deleted file mode 100644
index ebf7948..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ /dev/null
@@ -1,2951 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.conf;
-
-import com.facebook.presto.hadoop.$internal.com.google.common.annotations.VisibleForTesting;
-
-import java.io.BufferedInputStream;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.lang.ref.WeakReference;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.WeakHashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.regex.PatternSyntaxException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.transform.Transformer;
-import javax.xml.transform.TransformerException;
-import javax.xml.transform.TransformerFactory;
-import javax.xml.transform.dom.DOMSource;
-import javax.xml.transform.stream.StreamResult;
-
-import com.facebook.presto.hadoop.$internal.com.google.common.base.Charsets;
-import com.facebook.presto.hadoop.$internal.org.apache.commons.collections.map.UnmodifiableMap;
-import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.Log;
-import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.alias.CredentialProvider;
-import org.apache.hadoop.security.alias.CredentialProvider.CredentialEntry;
-import org.apache.hadoop.security.alias.CredentialProviderFactory;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringInterner;
-import org.apache.hadoop.util.StringUtils;
-import com.facebook.presto.hadoop.$internal.org.codehaus.jackson.JsonFactory;
-import com.facebook.presto.hadoop.$internal.org.codehaus.jackson.JsonGenerator;
-import org.w3c.dom.DOMException;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
-import org.xml.sax.SAXException;
-
-import com.facebook.presto.hadoop.$internal.com.google.common.base.Preconditions;
-
-/**
- * Provides access to configuration parameters.
- *
- * <h4 id="Resources">Resources</h4>
- *
- * <p>Configurations are specified by resources. A resource contains a set of
- * name/value pairs as XML data. Each resource is named by either a
- * <code>String</code> or by a {@link Path}. If named by a <code>String</code>,
- * then the classpath is examined for a file with that name.  If named by a
- * <code>Path</code>, then the local filesystem is examined directly, without
- * referring to the classpath.
- *
- * <p>Unless explicitly turned off, Hadoop by default specifies two
- * resources, loaded in-order from the classpath: <ol>
- * <li><tt>
- * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default-shaded.xml">
- * core-default-shaded.xml</a></tt>: Read-only defaults for hadoop.</li>
- * <li><tt>core-site.xml</tt>: Site-specific configuration for a given hadoop
- * installation.</li>
- * </ol>
- * Applications may add additional resources, which are loaded
- * subsequent to these resources in the order they are added.
- *
- * <h4 id="FinalParams">Final Parameters</h4>
- *
- * <p>Configuration parameters may be declared <i>final</i>.
- * Once a resource declares a value final, no subsequently-loaded
- * resource can alter that value.
- * For example, one might define a final parameter with:
- * <tt><pre>
- *  &lt;property&gt;
- *    &lt;name&gt;dfs.hosts.include&lt;/name&gt;
- *    &lt;value&gt;/etc/hadoop/conf/hosts.include&lt;/value&gt;
- *    <b>&lt;final&gt;true&lt;/final&gt;</b>
- *  &lt;/property&gt;</pre></tt>
- *
- * Administrators typically define parameters as final in
- * <tt>core-site.xml</tt> for values that user applications may not alter.
- *
- * <h4 id="VariableExpansion">Variable Expansion</h4>
- *
- * <p>Value strings are first processed for <i>variable expansion</i>. The
- * available properties are:<ol>
- * <li>Other properties defined in this Configuration; and, if a name is
- * undefined here,</li>
- * <li>Properties in {@link System#getProperties()}.</li>
- * </ol>
- *
- * <p>For example, if a configuration resource contains the following property
- * definitions:
- * <tt><pre>
- *  &lt;property&gt;
- *    &lt;name&gt;basedir&lt;/name&gt;
- *    &lt;value&gt;/user/${<i>user.name</i>}&lt;/value&gt;
- *  &lt;/property&gt;
- *
- *  &lt;property&gt;
- *    &lt;name&gt;tempdir&lt;/name&gt;
- *    &lt;value&gt;${<i>basedir</i>}/tmp&lt;/value&gt;
- *  &lt;/property&gt;</pre></tt>
- *
- * When <tt>conf.get("tempdir")</tt> is called, then <tt>${<i>basedir</i>}</tt>
- * will be resolved to another property in this Configuration, while
- * <tt>${<i>user.name</i>}</tt> would then ordinarily be resolved to the value
- * of the System property with that name.
- * By default, warnings will be given to any deprecated configuration
- * parameters and these are suppressible by configuring
- * <tt>log4j.logger.org.apache.hadoop.conf.Configuration.deprecation</tt> in
- * log4j.properties file.
- */
-@InterfaceAudience.Public
-@InterfaceStability.Stable
-public class Configuration implements Iterable<Map.Entry<String,String>>,
-	Writable {
-	private static final Log LOG =
-		LogFactory.getLog(Configuration.class);
-
-	private static final Log LOG_DEPRECATION =
-		LogFactory.getLog("org.apache.hadoop.conf.Configuration.deprecation");
-
-	private boolean quietmode = true;
-
-	private static final String DEFAULT_STRING_CHECK =
-		"testingforemptydefaultvalue";
-
-	private boolean allowNullValueProperties = false;
-
-	private static class Resource {
-		private final Object resource;
-		private final String name;
-
-		public Resource(Object resource) {
-			this(resource, resource.toString());
-		}
-
-		public Resource(Object resource, String name) {
-			this.resource = resource;
-			this.name = name;
-		}
-
-		public String getName(){
-			return name;
-		}
-
-		public Object getResource() {
-			return resource;
-		}
-
-		@Override
-		public String toString() {
-			return name;
-		}
-	}
-
-	/**
-	 * List of configuration resources.
-	 */
-	private ArrayList<Resource> resources = new ArrayList<Resource>();
-
-	/**
-	 * The value reported as the setting resource when a key is set
-	 * by code rather than a file resource by dumpConfiguration.
-	 */
-	static final String UNKNOWN_RESOURCE = "Unknown";
-
-
-	/**
-	 * List of configuration parameters marked <b>final</b>.
-	 */
-	private Set<String> finalParameters = Collections.newSetFromMap(
-		new ConcurrentHashMap<String, Boolean>());
-
-	private boolean loadDefaults = true;
-
-	/**
-	 * Configuration objects
-	 */
-	private static final WeakHashMap<Configuration,Object> REGISTRY =
-		new WeakHashMap<Configuration,Object>();
-
-	/**
-	 * List of default Resources. Resources are loaded in the order of the list
-	 * entries
-	 */
-	private static final CopyOnWriteArrayList<String> defaultResources =
-		new CopyOnWriteArrayList<String>();
-
-	private static final Map<ClassLoader, Map<String, WeakReference<Class<?>>>>
-		CACHE_CLASSES = new WeakHashMap<ClassLoader, Map<String, WeakReference<Class<?>>>>();
-
-	/**
-	 * Sentinel value to store negative cache results in {@link #CACHE_CLASSES}.
-	 */
-	private static final Class<?> NEGATIVE_CACHE_SENTINEL =
-		NegativeCacheSentinel.class;
-
-	/**
-	 * Stores the mapping of key to the resource which modifies or loads
-	 * the key most recently
-	 */
-	private Map<String, String[]> updatingResource;
-
-	/**
-	 * Class to keep the information about the keys which replace the deprecated
-	 * ones.
-	 *
-	 * This class stores the new keys which replace the deprecated keys and also
-	 * gives a provision to have a custom message for each of the deprecated key
-	 * that is being replaced. It also provides method to get the appropriate
-	 * warning message which can be logged whenever the deprecated key is used.
-	 */
-	private static class DeprecatedKeyInfo {
-		private final String[] newKeys;
-		private final String customMessage;
-		private final AtomicBoolean accessed = new AtomicBoolean(false);
-
-		DeprecatedKeyInfo(String[] newKeys, String customMessage) {
-			this.newKeys = newKeys;
-			this.customMessage = customMessage;
-		}
-
-		/**
-		 * Method to provide the warning message. It gives the custom message if
-		 * non-null, and default message otherwise.
-		 * @param key the associated deprecated key.
-		 * @return message that is to be logged when a deprecated key is used.
-		 */
-		private final String getWarningMessage(String key) {
-			String warningMessage;
-			if(customMessage == null) {
-				StringBuilder message = new StringBuilder(key);
-				String deprecatedKeySuffix = " is deprecated. Instead, use ";
-				message.append(deprecatedKeySuffix);
-				for (int i = 0; i < newKeys.length; i++) {
-					message.append(newKeys[i]);
-					if(i != newKeys.length-1) {
-						message.append(", ");
-					}
-				}
-				warningMessage = message.toString();
-			}
-			else {
-				warningMessage = customMessage;
-			}
-			return warningMessage;
-		}
-
-		boolean getAndSetAccessed() {
-			return accessed.getAndSet(true);
-		}
-
-		public void clearAccessed() {
-			accessed.set(false);
-		}
-	}
-
-	/**
-	 * A pending addition to the global set of deprecated keys.
-	 */
-	public static class DeprecationDelta {
-		private final String key;
-		private final String[] newKeys;
-		private final String customMessage;
-
-		DeprecationDelta(String key, String[] newKeys, String customMessage) {
-			Preconditions.checkNotNull(key);
-			Preconditions.checkNotNull(newKeys);
-			Preconditions.checkArgument(newKeys.length > 0);
-			this.key = key;
-			this.newKeys = newKeys;
-			this.customMessage = customMessage;
-		}
-
-		public DeprecationDelta(String key, String newKey, String customMessage) {
-			this(key, new String[] { newKey }, customMessage);
-		}
-
-		public DeprecationDelta(String key, String newKey) {
-			this(key, new String[] { newKey }, null);
-		}
-
-		public String getKey() {
-			return key;
-		}
-
-		public String[] getNewKeys() {
-			return newKeys;
-		}
-
-		public String getCustomMessage() {
-			return customMessage;
-		}
-	}
-
-	/**
-	 * The set of all keys which are deprecated.
-	 *
-	 * DeprecationContext objects are immutable.
-	 */
-	private static class DeprecationContext {
-		/**
-		 * Stores the deprecated keys, the new keys which replace the deprecated keys
-		 * and custom message(if any provided).
-		 */
-		private final Map<String, DeprecatedKeyInfo> deprecatedKeyMap;
-
-		/**
-		 * Stores a mapping from superseding keys to the keys which they deprecate.
-		 */
-		private final Map<String, String> reverseDeprecatedKeyMap;
-
-		/**
-		 * Create a new DeprecationContext by copying a previous DeprecationContext
-		 * and adding some deltas.
-		 *
-		 * @param other   The previous deprecation context to copy, or null to start
-		 *                from nothing.
-		 * @param deltas  The deltas to apply.
-		 */
-		@SuppressWarnings("unchecked")
-		DeprecationContext(DeprecationContext other, DeprecationDelta[] deltas) {
-			HashMap<String, DeprecatedKeyInfo> newDeprecatedKeyMap =
-				new HashMap<String, DeprecatedKeyInfo>();
-			HashMap<String, String> newReverseDeprecatedKeyMap =
-				new HashMap<String, String>();
-			if (other != null) {
-				for (Entry<String, DeprecatedKeyInfo> entry :
-					other.deprecatedKeyMap.entrySet()) {
-					newDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
-				}
-				for (Entry<String, String> entry :
-					other.reverseDeprecatedKeyMap.entrySet()) {
-					newReverseDeprecatedKeyMap.put(entry.getKey(), entry.getValue());
-				}
-			}
-			for (DeprecationDelta delta : deltas) {
-				if (!newDeprecatedKeyMap.containsKey(delta.getKey())) {
-					DeprecatedKeyInfo newKeyInfo =
-						new DeprecatedKeyInfo(delta.getNewKeys(), delta.getCustomMessage());
-					newDeprecatedKeyMap.put(delta.key, newKeyInfo);
-					for (String newKey : delta.getNewKeys()) {
-						newReverseDeprecatedKeyMap.put(newKey, delta.key);
-					}
-				}
-			}
-			this.deprecatedKeyMap =
-				UnmodifiableMap.decorate(newDeprecatedKeyMap);
-			this.reverseDeprecatedKeyMap =
-				UnmodifiableMap.decorate(newReverseDeprecatedKeyMap);
-		}
-
-		Map<String, DeprecatedKeyInfo> getDeprecatedKeyMap() {
-			return deprecatedKeyMap;
-		}
-
-		Map<String, String> getReverseDeprecatedKeyMap() {
-			return reverseDeprecatedKeyMap;
-		}
-	}
-
-	private static DeprecationDelta[] defaultDeprecations =
-		new DeprecationDelta[] {
-			new DeprecationDelta("topology.script.file.name",
-				CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY),
-			new DeprecationDelta("topology.script.number.args",
-				CommonConfigurationKeys.NET_TOPOLOGY_SCRIPT_NUMBER_ARGS_KEY),
-			new DeprecationDelta("hadoop.configured.node.mapping",
-				CommonConfigurationKeys.NET_TOPOLOGY_CONFIGURED_NODE_MAPPING_KEY),
-			new DeprecationDelta("topology.node.switch.mapping.impl",
-				CommonConfigurationKeys.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY),
-			new DeprecationDelta("dfs.df.interval",
-				CommonConfigurationKeys.FS_DF_INTERVAL_KEY),
-			new DeprecationDelta("hadoop.native.lib",
-				CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY),
-			new DeprecationDelta("fs.default.name",
-				CommonConfigurationKeys.FS_DEFAULT_NAME_KEY),
-			new DeprecationDelta("dfs.umaskmode",
-				CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY),
-			new DeprecationDelta("dfs.nfs.exports.allowed.hosts",
-				CommonConfigurationKeys.NFS_EXPORTS_ALLOWED_HOSTS_KEY)
-		};
-
-	/**
-	 * The global DeprecationContext.
-	 */
-	private static AtomicReference<DeprecationContext> deprecationContext =
-		new AtomicReference<DeprecationContext>(
-			new DeprecationContext(null, defaultDeprecations));
-
-	/**
-	 * Adds a set of deprecated keys to the global deprecations.
-	 *
-	 * This method is lockless.  It works by means of creating a new
-	 * DeprecationContext based on the old one, and then atomically swapping in
-	 * the new context.  If someone else updated the context in between us reading
-	 * the old context and swapping in the new one, we try again until we win the
-	 * race.
-	 *
-	 * @param deltas   The deprecations to add.
-	 */
-	public static void addDeprecations(DeprecationDelta[] deltas) {
-		DeprecationContext prev, next;
-		do {
-			prev = deprecationContext.get();
-			next = new DeprecationContext(prev, deltas);
-		} while (!deprecationContext.compareAndSet(prev, next));
-	}
-
-	/**
-	 * Adds the deprecated key to the global deprecation map.
-	 * It does not override any existing entries in the deprecation map.
-	 * This is to be used only by the developers in order to add deprecation of
-	 * keys, and attempts to call this method after loading resources once,
-	 * would lead to <tt>UnsupportedOperationException</tt>
-	 *
-	 * If a key is deprecated in favor of multiple keys, they are all treated as
-	 * aliases of each other, and setting any one of them resets all the others
-	 * to the new value.
-	 *
-	 * If you have multiple deprecation entries to add, it is more efficient to
-	 * use #addDeprecations(DeprecationDelta[] deltas) instead.
-	 *
-	 * @param key
-	 * @param newKeys
-	 * @param customMessage
-	 * @deprecated use {@link #addDeprecation(String key, String newKey,
-		String customMessage)} instead
-	 */
-	@Deprecated
-	public static void addDeprecation(String key, String[] newKeys,
-		String customMessage) {
-		addDeprecations(new DeprecationDelta[] {
-			new DeprecationDelta(key, newKeys, customMessage)
-		});
-	}
-
-	/**
-	 * Adds the deprecated key to the global deprecation map.
-	 * It does not override any existing entries in the deprecation map.
-	 * This is to be used only by the developers in order to add deprecation of
-	 * keys, and attempts to call this method after loading resources once,
-	 * would lead to <tt>UnsupportedOperationException</tt>
-	 *
-	 * If you have multiple deprecation entries to add, it is more efficient to
-	 * use #addDeprecations(DeprecationDelta[] deltas) instead.
-	 *
-	 * @param key
-	 * @param newKey
-	 * @param customMessage
-	 */
-	public static void addDeprecation(String key, String newKey,
-		String customMessage) {
-		addDeprecation(key, new String[] {newKey}, customMessage);
-	}
-
-	/**
-	 * Adds the deprecated key to the global deprecation map when no custom
-	 * message is provided.
-	 * It does not override any existing entries in the deprecation map.
-	 * This is to be used only by the developers in order to add deprecation of
-	 * keys, and attempts to call this method after loading resources once,
-	 * would lead to <tt>UnsupportedOperationException</tt>
-	 *
-	 * If a key is deprecated in favor of multiple keys, they are all treated as
-	 * aliases of each other, and setting any one of them resets all the others
-	 * to the new value.
-	 *
-	 * If you have multiple deprecation entries to add, it is more efficient to
-	 * use #addDeprecations(DeprecationDelta[] deltas) instead.
-	 *
-	 * @param key Key that is to be deprecated
-	 * @param newKeys list of keys that take up the values of deprecated key
-	 * @deprecated use {@link #addDeprecation(String key, String newKey)} instead
-	 */
-	@Deprecated
-	public static void addDeprecation(String key, String[] newKeys) {
-		addDeprecation(key, newKeys, null);
-	}
-
-	/**
-	 * Adds the deprecated key to the global deprecation map when no custom
-	 * message is provided.
-	 * It does not override any existing entries in the deprecation map.
-	 * This is to be used only by the developers in order to add deprecation of
-	 * keys, and attempts to call this method after loading resources once,
-	 * would lead to <tt>UnsupportedOperationException</tt>
-	 *
-	 * If you have multiple deprecation entries to add, it is more efficient to
-	 * use #addDeprecations(DeprecationDelta[] deltas) instead.
-	 *
-	 * @param key Key that is to be deprecated
-	 * @param newKey key that takes up the value of deprecated key
-	 */
-	public static void addDeprecation(String key, String newKey) {
-		addDeprecation(key, new String[] {newKey}, null);
-	}
-
-	/**
-	 * checks whether the given <code>key</code> is deprecated.
-	 *
-	 * @param key the parameter which is to be checked for deprecation
-	 * @return <code>true</code> if the key is deprecated and
-	 *         <code>false</code> otherwise.
-	 */
-	public static boolean isDeprecated(String key) {
-		return deprecationContext.get().getDeprecatedKeyMap().containsKey(key);
-	}
-
-	/**
-	 * Sets all deprecated properties that are not currently set but have a
-	 * corresponding new property that is set. Useful for iterating the
-	 * properties when all deprecated properties for currently set properties
-	 * need to be present.
-	 */
-	public void setDeprecatedProperties() {
-		DeprecationContext deprecations = deprecationContext.get();
-		Properties props = getProps();
-		Properties overlay = getOverlay();
-		for (Map.Entry<String, DeprecatedKeyInfo> entry :
-			deprecations.getDeprecatedKeyMap().entrySet()) {
-			String depKey = entry.getKey();
-			if (!overlay.contains(depKey)) {
-				for (String newKey : entry.getValue().newKeys) {
-					String val = overlay.getProperty(newKey);
-					if (val != null) {
-						props.setProperty(depKey, val);
-						overlay.setProperty(depKey, val);
-						break;
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Checks for the presence of the property <code>name</code> in the
-	 * deprecation map. Returns the first of the list of new keys if present
-	 * in the deprecation map or the <code>name</code> itself. If the property
-	 * is not presently set but the property map contains an entry for the
-	 * deprecated key, the value of the deprecated key is set as the value for
-	 * the provided property name.
-	 *
-	 * @param name the property name
-	 * @return the first property in the list of properties mapping
-	 *         the <code>name</code> or the <code>name</code> itself.
-	 */
-	private String[] handleDeprecation(DeprecationContext deprecations,
-		String name) {
-		if (null != name) {
-			name = name.trim();
-		}
-		ArrayList<String > names = new ArrayList<String>();
-		if (isDeprecated(name)) {
-			DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
-			warnOnceIfDeprecated(deprecations, name);
-			for (String newKey : keyInfo.newKeys) {
-				if(newKey != null) {
-					names.add(newKey);
-				}
-			}
-		}
-		if(names.size() == 0) {
-			names.add(name);
-		}
-		for(String n : names) {
-			String deprecatedKey = deprecations.getReverseDeprecatedKeyMap().get(n);
-			if (deprecatedKey != null && !getOverlay().containsKey(n) &&
-				getOverlay().containsKey(deprecatedKey)) {
-				getProps().setProperty(n, getOverlay().getProperty(deprecatedKey));
-				getOverlay().setProperty(n, getOverlay().getProperty(deprecatedKey));
-			}
-		}
-		return names.toArray(new String[names.size()]);
-	}
-
-	private void handleDeprecation() {
-		LOG.debug("Handling deprecation for all properties in config...");
-		DeprecationContext deprecations = deprecationContext.get();
-		Set<Object> keys = new HashSet<Object>();
-		keys.addAll(getProps().keySet());
-		for (Object item: keys) {
-			LOG.debug("Handling deprecation for " + (String)item);
-			handleDeprecation(deprecations, (String)item);
-		}
-	}
-
-	static{
-		//print deprecation warning if hadoop-site.xml is found in classpath
-		ClassLoader cL = Thread.currentThread().getContextClassLoader();
-		if (cL == null) {
-			cL = Configuration.class.getClassLoader();
-		}
-		if(cL.getResource("hadoop-site.xml")!=null) {
-			LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
-				"Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
-				+ "mapred-site.xml and hdfs-site.xml to override properties of " +
-				"core-default-shaded.xml, mapred-default.xml and hdfs-default.xml " +
-				"respectively");
-		}
-		addDefaultResource("core-default-shaded.xml");
-		addDefaultResource("core-site.xml");
-	}
-
-	private Properties properties;
-	private Properties overlay;
-	private ClassLoader classLoader;
-	{
-		classLoader = Thread.currentThread().getContextClassLoader();
-		if (classLoader == null) {
-			classLoader = Configuration.class.getClassLoader();
-		}
-	}
-
-	/** A new configuration. */
-	public Configuration() {
-		this(true);
-	}
-
-	/** A new configuration where the behavior of reading from the default
-	 * resources can be turned off.
-	 *
-	 * If the parameter {@code loadDefaults} is false, the new instance
-	 * will not load resources from the default files.
-	 * @param loadDefaults specifies whether to load from the default files
-	 */
-	public Configuration(boolean loadDefaults) {
-		this.loadDefaults = loadDefaults;
-		updatingResource = new ConcurrentHashMap<String, String[]>();
-		synchronized(Configuration.class) {
-			REGISTRY.put(this, null);
-		}
-	}
-
-	/**
-	 * A new configuration with the same settings cloned from another.
-	 *
-	 * @param other the configuration from which to clone settings.
-	 */
-	@SuppressWarnings("unchecked")
-	public Configuration(Configuration other) {
-		this.resources = (ArrayList<Resource>) other.resources.clone();
-		synchronized(other) {
-			if (other.properties != null) {
-				this.properties = (Properties)other.properties.clone();
-			}
-
-			if (other.overlay!=null) {
-				this.overlay = (Properties)other.overlay.clone();
-			}
-
-			this.updatingResource = new ConcurrentHashMap<String, String[]>(
-				other.updatingResource);
-			this.finalParameters = Collections.newSetFromMap(
-				new ConcurrentHashMap<String, Boolean>());
-			this.finalParameters.addAll(other.finalParameters);
-		}
-
-		synchronized(Configuration.class) {
-			REGISTRY.put(this, null);
-		}
-		this.classLoader = other.classLoader;
-		this.loadDefaults = other.loadDefaults;
-		setQuietMode(other.getQuietMode());
-	}
-
-	/**
-	 * Add a default resource. Resources are loaded in the order of the resources
-	 * added.
-	 * @param name file name. File should be present in the classpath.
-	 */
-	public static synchronized void addDefaultResource(String name) {
-		if(!defaultResources.contains(name)) {
-			defaultResources.add(name);
-			for(Configuration conf : REGISTRY.keySet()) {
-				if(conf.loadDefaults) {
-					conf.reloadConfiguration();
-				}
-			}
-		}
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * @param name resource to be added, the classpath is examined for a file
-	 *             with that name.
-	 */
-	public void addResource(String name) {
-		addResourceObject(new Resource(name));
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * @param url url of the resource to be added, the local filesystem is
-	 *            examined directly to find the resource, without referring to
-	 *            the classpath.
-	 */
-	public void addResource(URL url) {
-		addResourceObject(new Resource(url));
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * @param file file-path of resource to be added, the local filesystem is
-	 *             examined directly to find the resource, without referring to
-	 *             the classpath.
-	 */
-	public void addResource(Path file) {
-		addResourceObject(new Resource(file));
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * WARNING: The contents of the InputStream will be cached, by this method.
-	 * So use this sparingly because it does increase the memory consumption.
-	 *
-	 * @param in InputStream to deserialize the object from. In will be read from
-	 * when a get or set is called next.  After it is read the stream will be
-	 * closed.
-	 */
-	public void addResource(InputStream in) {
-		addResourceObject(new Resource(in));
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * @param in InputStream to deserialize the object from.
-	 * @param name the name of the resource because InputStream.toString is not
-	 * very descriptive some times.
-	 */
-	public void addResource(InputStream in, String name) {
-		addResourceObject(new Resource(in, name));
-	}
-
-	/**
-	 * Add a configuration resource.
-	 *
-	 * The properties of this resource will override properties of previously
-	 * added resources, unless they were marked <a href="#Final">final</a>.
-	 *
-	 * @param conf Configuration object from which to load properties
-	 */
-	public void addResource(Configuration conf) {
-		addResourceObject(new Resource(conf.getProps()));
-	}
-
-
-
-	/**
-	 * Reload configuration from previously added resources.
-	 *
-	 * This method will clear all the configuration read from the added
-	 * resources, and final parameters. This will make the resources to
-	 * be read again before accessing the values. Values that are added
-	 * via set methods will overlay values read from the resources.
-	 */
-	public synchronized void reloadConfiguration() {
-		properties = null;                            // trigger reload
-		finalParameters.clear();                      // clear site-limits
-	}
-
-	private synchronized void addResourceObject(Resource resource) {
-		resources.add(resource);                      // add to resources
-		reloadConfiguration();
-	}
-
-	private static final int MAX_SUBST = 20;
-
-	private static final int SUB_START_IDX = 0;
-	private static final int SUB_END_IDX = SUB_START_IDX + 1;
-
-	/**
-	 * This is a manual implementation of the following regex
-	 * "\\$\\{[^\\}\\$\u0020]+\\}". It can be 15x more efficient than
-	 * a regex matcher as demonstrated by HADOOP-11506. This is noticeable with
-	 * Hadoop apps building on the assumption Configuration#get is an O(1)
-	 * hash table lookup, especially when the eval is a long string.
-	 *
-	 * @param eval a string that may contain variables requiring expansion.
-	 * @return a 2-element int array res such that
-	 * eval.substring(res[0], res[1]) is "var" for the left-most occurrence of
-	 * ${var} in eval. If no variable is found -1, -1 is returned.
-	 */
-	private static int[] findSubVariable(String eval) {
-		int[] result = {-1, -1};
-
-		int matchStart;
-		int leftBrace;
-
-		// scanning for a brace first because it's less frequent than $
-		// that can occur in nested class names
-		//
-		match_loop:
-		for (matchStart = 1, leftBrace = eval.indexOf('{', matchStart);
-			// minimum left brace position (follows '$')
-			 leftBrace > 0
-				 // right brace of a smallest valid expression "${c}"
-				 && leftBrace + "{c".length() < eval.length();
-			 leftBrace = eval.indexOf('{', matchStart)) {
-			int matchedLen = 0;
-			if (eval.charAt(leftBrace - 1) == '$') {
-				int subStart = leftBrace + 1; // after '{'
-				for (int i = subStart; i < eval.length(); i++) {
-					switch (eval.charAt(i)) {
-						case '}':
-							if (matchedLen > 0) { // match
-								result[SUB_START_IDX] = subStart;
-								result[SUB_END_IDX] = subStart + matchedLen;
-								break match_loop;
-							}
-							// fall through to skip 1 char
-						case ' ':
-						case '$':
-							matchStart = i + 1;
-							continue match_loop;
-						default:
-							matchedLen++;
-					}
-				}
-				// scanned from "${"  to the end of eval, and no reset via ' ', '$':
-				//    no match!
-				break match_loop;
-			} else {
-				// not a start of a variable
-				//
-				matchStart = leftBrace + 1;
-			}
-		}
-		return result;
-	}
-
-	/**
-	 * Attempts to repeatedly expand the value {@code expr} by replacing the
-	 * left-most substring of the form "${var}" in the following precedence order
-	 * <ol>
-	 *   <li>by the value of the Java system property "var" if defined</li>
-	 *   <li>by the value of the configuration key "var" if defined</li>
-	 * </ol>
-	 *
-	 * If var is unbounded the current state of expansion "prefix${var}suffix" is
-	 * returned.
-	 *
-	 * @param expr the literal value of a config key
-	 * @return null if expr is null, otherwise the value resulting from expanding
-	 * expr using the algorithm above.
-	 * @throws IllegalArgumentException when more than
-	 * {@link Configuration#MAX_SUBST} replacements are required
-	 */
-	private String substituteVars(String expr) {
-		if (expr == null) {
-			return null;
-		}
-		String eval = expr;
-		for (int s = 0; s < MAX_SUBST; s++) {
-			final int[] varBounds = findSubVariable(eval);
-			if (varBounds[SUB_START_IDX] == -1) {
-				return eval;
-			}
-			final String var = eval.substring(varBounds[SUB_START_IDX],
-				varBounds[SUB_END_IDX]);
-			String val = null;
-			try {
-				val = System.getProperty(var);
-			} catch(SecurityException se) {
-				LOG.warn("Unexpected SecurityException in Configuration", se);
-			}
-			if (val == null) {
-				val = getRaw(var);
-			}
-			if (val == null) {
-				return eval; // return literal ${var}: var is unbound
-			}
-			final int dollar = varBounds[SUB_START_IDX] - "${".length();
-			final int afterRightBrace = varBounds[SUB_END_IDX] + "}".length();
-			// substitute
-			eval = eval.substring(0, dollar)
-				+ val
-				+ eval.substring(afterRightBrace);
-		}
-		throw new IllegalStateException("Variable substitution depth too large: "
-			+ MAX_SUBST + " " + expr);
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property, <code>null</code> if
-	 * no such property exists. If the key is deprecated, it returns the value of
-	 * the first key which replaces the deprecated key and is not null.
-	 *
-	 * Values are processed for <a href="#VariableExpansion">variable expansion</a>
-	 * before being returned.
-	 *
-	 * @param name the property name, will be trimmed before get value.
-	 * @return the value of the <code>name</code> or its replacing property,
-	 *         or null if no such property exists.
-	 */
-	public String get(String name) {
-		String[] names = handleDeprecation(deprecationContext.get(), name);
-		String result = null;
-		for(String n : names) {
-			result = substituteVars(getProps().getProperty(n));
-		}
-		return result;
-	}
-
-	/**
-	 * Set Configuration to allow keys without values during setup.  Intended
-	 * for use during testing.
-	 *
-	 * @param val If true, will allow Configuration to store keys without values
-	 */
-	@VisibleForTesting
-	public void setAllowNullValueProperties( boolean val ) {
-		this.allowNullValueProperties = val;
-	}
-
-	/**
-	 * Return existence of the <code>name</code> property, but only for
-	 * names which have no valid value, usually non-existent or commented
-	 * out in XML.
-	 *
-	 * @param name the property name
-	 * @return true if the property <code>name</code> exists without value
-	 */
-	@VisibleForTesting
-	public boolean onlyKeyExists(String name) {
-		String[] names = handleDeprecation(deprecationContext.get(), name);
-		for(String n : names) {
-			if ( getProps().getProperty(n,DEFAULT_STRING_CHECK)
-				.equals(DEFAULT_STRING_CHECK) ) {
-				return true;
-			}
-		}
-		return false;
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a trimmed <code>String</code>,
-	 * <code>null</code> if no such property exists.
-	 * If the key is deprecated, it returns the value of
-	 * the first key which replaces the deprecated key and is not null
-	 *
-	 * Values are processed for <a href="#VariableExpansion">variable expansion</a>
-	 * before being returned.
-	 *
-	 * @param name the property name.
-	 * @return the value of the <code>name</code> or its replacing property,
-	 *         or null if no such property exists.
-	 */
-	public String getTrimmed(String name) {
-		String value = get(name);
-
-		if (null == value) {
-			return null;
-		} else {
-			return value.trim();
-		}
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a trimmed <code>String</code>,
-	 * <code>defaultValue</code> if no such property exists.
-	 * See @{Configuration#getTrimmed} for more details.
-	 *
-	 * @param name          the property name.
-	 * @param defaultValue  the property default value.
-	 * @return              the value of the <code>name</code> or defaultValue
-	 *                      if it is not set.
-	 */
-	public String getTrimmed(String name, String defaultValue) {
-		String ret = getTrimmed(name);
-		return ret == null ? defaultValue : ret;
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property, without doing
-	 * <a href="#VariableExpansion">variable expansion</a>.If the key is
-	 * deprecated, it returns the value of the first key which replaces
-	 * the deprecated key and is not null.
-	 *
-	 * @param name the property name.
-	 * @return the value of the <code>name</code> property or
-	 *         its replacing property and null if no such property exists.
-	 */
-	public String getRaw(String name) {
-		String[] names = handleDeprecation(deprecationContext.get(), name);
-		String result = null;
-		for(String n : names) {
-			result = getProps().getProperty(n);
-		}
-		return result;
-	}
-
-	/**
-	 * Returns alternative names (non-deprecated keys or previously-set deprecated keys)
-	 * for a given non-deprecated key.
-	 * If the given key is deprecated, return null.
-	 *
-	 * @param name property name.
-	 * @return alternative names.
-	 */
-	private String[] getAlternativeNames(String name) {
-		String altNames[] = null;
-		DeprecatedKeyInfo keyInfo = null;
-		DeprecationContext cur = deprecationContext.get();
-		String depKey = cur.getReverseDeprecatedKeyMap().get(name);
-		if(depKey != null) {
-			keyInfo = cur.getDeprecatedKeyMap().get(depKey);
-			if(keyInfo.newKeys.length > 0) {
-				if(getProps().containsKey(depKey)) {
-					//if deprecated key is previously set explicitly
-					List<String> list = new ArrayList<String>();
-					list.addAll(Arrays.asList(keyInfo.newKeys));
-					list.add(depKey);
-					altNames = list.toArray(new String[list.size()]);
-				}
-				else {
-					altNames = keyInfo.newKeys;
-				}
-			}
-		}
-		return altNames;
-	}
-
-	/**
-	 * Set the <code>value</code> of the <code>name</code> property. If
-	 * <code>name</code> is deprecated or there is a deprecated name associated to it,
-	 * it sets the value to both names. Name will be trimmed before put into
-	 * configuration.
-	 *
-	 * @param name property name.
-	 * @param value property value.
-	 */
-	public void set(String name, String value) {
-		set(name, value, null);
-	}
-
-	/**
-	 * Set the <code>value</code> of the <code>name</code> property. If
-	 * <code>name</code> is deprecated, it also sets the <code>value</code> to
-	 * the keys that replace the deprecated key. Name will be trimmed before put
-	 * into configuration.
-	 *
-	 * @param name property name.
-	 * @param value property value.
-	 * @param source the place that this configuration value came from
-	 * (For debugging).
-	 * @throws IllegalArgumentException when the value or name is null.
-	 */
-	public void set(String name, String value, String source) {
-		Preconditions.checkArgument(
-			name != null,
-			"Property name must not be null");
-		Preconditions.checkArgument(
-			value != null,
-			"The value of property " + name + " must not be null");
-		name = name.trim();
-		DeprecationContext deprecations = deprecationContext.get();
-		if (deprecations.getDeprecatedKeyMap().isEmpty()) {
-			getProps();
-		}
-		getOverlay().setProperty(name, value);
-		getProps().setProperty(name, value);
-		String newSource = (source == null ? "programatically" : source);
-
-		if (!isDeprecated(name)) {
-			updatingResource.put(name, new String[] {newSource});
-			String[] altNames = getAlternativeNames(name);
-			if(altNames != null) {
-				for(String n: altNames) {
-					if(!n.equals(name)) {
-						getOverlay().setProperty(n, value);
-						getProps().setProperty(n, value);
-						updatingResource.put(n, new String[] {newSource});
-					}
-				}
-			}
-		}
-		else {
-			String[] names = handleDeprecation(deprecationContext.get(), name);
-			String altSource = "because " + name + " is deprecated";
-			for(String n : names) {
-				getOverlay().setProperty(n, value);
-				getProps().setProperty(n, value);
-				updatingResource.put(n, new String[] {altSource});
-			}
-		}
-	}
-
-	private void warnOnceIfDeprecated(DeprecationContext deprecations, String name) {
-		DeprecatedKeyInfo keyInfo = deprecations.getDeprecatedKeyMap().get(name);
-		if (keyInfo != null && !keyInfo.getAndSetAccessed()) {
-			LOG_DEPRECATION.info(keyInfo.getWarningMessage(name));
-		}
-	}
-
-	/**
-	 * Unset a previously set property.
-	 */
-	public synchronized void unset(String name) {
-		String[] names = null;
-		if (!isDeprecated(name)) {
-			names = getAlternativeNames(name);
-			if(names == null) {
-				names = new String[]{name};
-			}
-		}
-		else {
-			names = handleDeprecation(deprecationContext.get(), name);
-		}
-
-		for(String n: names) {
-			getOverlay().remove(n);
-			getProps().remove(n);
-		}
-	}
-
-	/**
-	 * Sets a property if it is currently unset.
-	 * @param name the property name
-	 * @param value the new value
-	 */
-	public synchronized void setIfUnset(String name, String value) {
-		if (get(name) == null) {
-			set(name, value);
-		}
-	}
-
-	private synchronized Properties getOverlay() {
-		if (overlay==null){
-			overlay=new Properties();
-		}
-		return overlay;
-	}
-
-	/**
-	 * Get the value of the <code>name</code>. If the key is deprecated,
-	 * it returns the value of the first key which replaces the deprecated key
-	 * and is not null.
-	 * If no such property exists,
-	 * then <code>defaultValue</code> is returned.
-	 *
-	 * @param name property name, will be trimmed before get value.
-	 * @param defaultValue default value.
-	 * @return property value, or <code>defaultValue</code> if the property
-	 *         doesn't exist.
-	 */
-	public String get(String name, String defaultValue) {
-		String[] names = handleDeprecation(deprecationContext.get(), name);
-		String result = null;
-		for(String n : names) {
-			result = substituteVars(getProps().getProperty(n, defaultValue));
-		}
-		return result;
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as an <code>int</code>.
-	 *
-	 * If no such property exists, the provided default value is returned,
-	 * or if the specified value is not a valid <code>int</code>,
-	 * then an error is thrown.
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @throws NumberFormatException when the value is invalid
-	 * @return property value as an <code>int</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public int getInt(String name, int defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		String hexString = getHexDigits(valueString);
-		if (hexString != null) {
-			return Integer.parseInt(hexString, 16);
-		}
-		return Integer.parseInt(valueString);
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a set of comma-delimited
-	 * <code>int</code> values.
-	 *
-	 * If no such property exists, an empty array is returned.
-	 *
-	 * @param name property name
-	 * @return property value interpreted as an array of comma-delimited
-	 *         <code>int</code> values
-	 */
-	public int[] getInts(String name) {
-		String[] strings = getTrimmedStrings(name);
-		int[] ints = new int[strings.length];
-		for (int i = 0; i < strings.length; i++) {
-			ints[i] = Integer.parseInt(strings[i]);
-		}
-		return ints;
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to an <code>int</code>.
-	 *
-	 * @param name property name.
-	 * @param value <code>int</code> value of the property.
-	 */
-	public void setInt(String name, int value) {
-		set(name, Integer.toString(value));
-	}
-
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>long</code>.
-	 * If no such property exists, the provided default value is returned,
-	 * or if the specified value is not a valid <code>long</code>,
-	 * then an error is thrown.
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @throws NumberFormatException when the value is invalid
-	 * @return property value as a <code>long</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public long getLong(String name, long defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		String hexString = getHexDigits(valueString);
-		if (hexString != null) {
-			return Long.parseLong(hexString, 16);
-		}
-		return Long.parseLong(valueString);
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>long</code> or
-	 * human readable format. If no such property exists, the provided default
-	 * value is returned, or if the specified value is not a valid
-	 * <code>long</code> or human readable format, then an error is thrown. You
-	 * can use the following suffix (case insensitive): k(kilo), m(mega), g(giga),
-	 * t(tera), p(peta), e(exa)
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @throws NumberFormatException when the value is invalid
-	 * @return property value as a <code>long</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public long getLongBytes(String name, long defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		return StringUtils.TraditionalBinaryPrefix.string2long(valueString);
-	}
-
-	private String getHexDigits(String value) {
-		boolean negative = false;
-		String str = value;
-		String hexString = null;
-		if (value.startsWith("-")) {
-			negative = true;
-			str = value.substring(1);
-		}
-		if (str.startsWith("0x") || str.startsWith("0X")) {
-			hexString = str.substring(2);
-			if (negative) {
-				hexString = "-" + hexString;
-			}
-			return hexString;
-		}
-		return null;
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to a <code>long</code>.
-	 *
-	 * @param name property name.
-	 * @param value <code>long</code> value of the property.
-	 */
-	public void setLong(String name, long value) {
-		set(name, Long.toString(value));
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>float</code>.
-	 * If no such property exists, the provided default value is returned,
-	 * or if the specified value is not a valid <code>float</code>,
-	 * then an error is thrown.
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @throws NumberFormatException when the value is invalid
-	 * @return property value as a <code>float</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public float getFloat(String name, float defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		return Float.parseFloat(valueString);
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to a <code>float</code>.
-	 *
-	 * @param name property name.
-	 * @param value property value.
-	 */
-	public void setFloat(String name, float value) {
-		set(name,Float.toString(value));
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>double</code>.
-	 * If no such property exists, the provided default value is returned,
-	 * or if the specified value is not a valid <code>double</code>,
-	 * then an error is thrown.
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @throws NumberFormatException when the value is invalid
-	 * @return property value as a <code>double</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public double getDouble(String name, double defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		return Double.parseDouble(valueString);
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to a <code>double</code>.
-	 *
-	 * @param name property name.
-	 * @param value property value.
-	 */
-	public void setDouble(String name, double value) {
-		set(name,Double.toString(value));
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>boolean</code>.
-	 * If no such property is specified, or if the specified value is not a valid
-	 * <code>boolean</code>, then <code>defaultValue</code> is returned.
-	 *
-	 * @param name property name.
-	 * @param defaultValue default value.
-	 * @return property value as a <code>boolean</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public boolean getBoolean(String name, boolean defaultValue) {
-		String valueString = getTrimmed(name);
-		if (null == valueString || valueString.isEmpty()) {
-			return defaultValue;
-		}
-
-		if (StringUtils.equalsIgnoreCase("true", valueString))
-			return true;
-		else if (StringUtils.equalsIgnoreCase("false", valueString))
-			return false;
-		else return defaultValue;
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to a <code>boolean</code>.
-	 *
-	 * @param name property name.
-	 * @param value <code>boolean</code> value of the property.
-	 */
-	public void setBoolean(String name, boolean value) {
-		set(name, Boolean.toString(value));
-	}
-
-	/**
-	 * Set the given property, if it is currently unset.
-	 * @param name property name
-	 * @param value new value
-	 */
-	public void setBooleanIfUnset(String name, boolean value) {
-		setIfUnset(name, Boolean.toString(value));
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to the given type. This
-	 * is equivalent to <code>set(&lt;name&gt;, value.toString())</code>.
-	 * @param name property name
-	 * @param value new value
-	 */
-	public <T extends Enum<T>> void setEnum(String name, T value) {
-		set(name, value.toString());
-	}
-
-	/**
-	 * Return value matching this enumerated type.
-	 * Note that the returned value is trimmed by this method.
-	 * @param name Property name
-	 * @param defaultValue Value returned if no mapping exists
-	 * @throws IllegalArgumentException If mapping is illegal for the type
-	 * provided
-	 */
-	public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
-		final String val = getTrimmed(name);
-		return null == val
-			? defaultValue
-			: Enum.valueOf(defaultValue.getDeclaringClass(), val);
-	}
-
-	enum ParsedTimeDuration {
-		NS {
-			TimeUnit unit() { return TimeUnit.NANOSECONDS; }
-			String suffix() { return "ns"; }
-		},
-		US {
-			TimeUnit unit() { return TimeUnit.MICROSECONDS; }
-			String suffix() { return "us"; }
-		},
-		MS {
-			TimeUnit unit() { return TimeUnit.MILLISECONDS; }
-			String suffix() { return "ms"; }
-		},
-		S {
-			TimeUnit unit() { return TimeUnit.SECONDS; }
-			String suffix() { return "s"; }
-		},
-		M {
-			TimeUnit unit() { return TimeUnit.MINUTES; }
-			String suffix() { return "m"; }
-		},
-		H {
-			TimeUnit unit() { return TimeUnit.HOURS; }
-			String suffix() { return "h"; }
-		},
-		D {
-			TimeUnit unit() { return TimeUnit.DAYS; }
-			String suffix() { return "d"; }
-		};
-		abstract TimeUnit unit();
-		abstract String suffix();
-		static ParsedTimeDuration unitFor(String s) {
-			for (ParsedTimeDuration ptd : values()) {
-				// iteration order is in decl order, so SECONDS matched last
-				if (s.endsWith(ptd.suffix())) {
-					return ptd;
-				}
-			}
-			return null;
-		}
-		static ParsedTimeDuration unitFor(TimeUnit unit) {
-			for (ParsedTimeDuration ptd : values()) {
-				if (ptd.unit() == unit) {
-					return ptd;
-				}
-			}
-			return null;
-		}
-	}
-
-	/**
-	 * Set the value of <code>name</code> to the given time duration. This
-	 * is equivalent to <code>set(&lt;name&gt;, value + &lt;time suffix&gt;)</code>.
-	 * @param name Property name
-	 * @param value Time duration
-	 * @param unit Unit of time
-	 */
-	public void setTimeDuration(String name, long value, TimeUnit unit) {
-		set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
-	}
-
-	/**
-	 * Return time duration in the given time unit. Valid units are encoded in
-	 * properties as suffixes: nanoseconds (ns), microseconds (us), milliseconds
-	 * (ms), seconds (s), minutes (m), hours (h), and days (d).
-	 * @param name Property name
-	 * @param defaultValue Value returned if no mapping exists.
-	 * @param unit Unit to convert the stored property, if it exists.
-	 * @throws NumberFormatException If the property stripped of its unit is not
-	 *         a number
-	 */
-	public long getTimeDuration(String name, long defaultValue, TimeUnit unit) {
-		String vStr = get(name);
-		if (null == vStr) {
-			return defaultValue;
-		}
-		vStr = vStr.trim();
-		ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
-		if (null == vUnit) {
-			LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
-			vUnit = ParsedTimeDuration.unitFor(unit);
-		} else {
-			vStr = vStr.substring(0, vStr.lastIndexOf(vUnit.suffix()));
-		}
-		return unit.convert(Long.parseLong(vStr), vUnit.unit());
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>Pattern</code>.
-	 * If no such property is specified, or if the specified value is not a valid
-	 * <code>Pattern</code>, then <code>DefaultValue</code> is returned.
-	 * Note that the returned value is NOT trimmed by this method.
-	 *
-	 * @param name property name
-	 * @param defaultValue default value
-	 * @return property value as a compiled Pattern, or defaultValue
-	 */
-	public Pattern getPattern(String name, Pattern defaultValue) {
-		String valString = get(name);
-		if (null == valString || valString.isEmpty()) {
-			return defaultValue;
-		}
-		try {
-			return Pattern.compile(valString);
-		} catch (PatternSyntaxException pse) {
-			LOG.warn("Regular expression '" + valString + "' for property '" +
-				name + "' not valid. Using default", pse);
-			return defaultValue;
-		}
-	}
-
-	/**
-	 * Set the given property to <code>Pattern</code>.
-	 * If the pattern is passed as null, sets the empty pattern which results in
-	 * further calls to getPattern(...) returning the default value.
-	 *
-	 * @param name property name
-	 * @param pattern new value
-	 */
-	public void setPattern(String name, Pattern pattern) {
-		assert pattern != null : "Pattern cannot be null";
-		set(name, pattern.pattern());
-	}
-
-	/**
-	 * Gets information about why a property was set.  Typically this is the
-	 * path to the resource objects (file, URL, etc.) the property came from, but
-	 * it can also indicate that it was set programatically, or because of the
-	 * command line.
-	 *
-	 * @param name - The property name to get the source of.
-	 * @return null - If the property or its source wasn't found. Otherwise,
-	 * returns a list of the sources of the resource.  The older sources are
-	 * the first ones in the list.  So for example if a configuration is set from
-	 * the command line, and then written out to a file that is read back in the
-	 * first entry would indicate that it was set from the command line, while
-	 * the second one would indicate the file that the new configuration was read
-	 * in from.
-	 */
-	@InterfaceStability.Unstable
-	public synchronized String[] getPropertySources(String name) {
-		if (properties == null) {
-			// If properties is null, it means a resource was newly added
-			// but the props were cleared so as to load it upon future
-			// requests. So lets force a load by asking a properties list.
-			getProps();
-		}
-		// Return a null right away if our properties still
-		// haven't loaded or the resource mapping isn't defined
-		if (properties == null || updatingResource == null) {
-			return null;
-		} else {
-			String[] source = updatingResource.get(name);
-			if(source == null) {
-				return null;
-			} else {
-				return Arrays.copyOf(source, source.length);
-			}
-		}
-	}
-
-	/**
-	 * A class that represents a set of positive integer ranges. It parses
-	 * strings of the form: "2-3,5,7-" where ranges are separated by comma and
-	 * the lower/upper bounds are separated by dash. Either the lower or upper
-	 * bound may be omitted meaning all values up to or over. So the string
-	 * above means 2, 3, 5, and 7, 8, 9, ...
-	 */
-	public static class IntegerRanges implements Iterable<Integer>{
-		private static class Range {
-			int start;
-			int end;
-		}
-
-		private static class RangeNumberIterator implements Iterator<Integer> {
-			Iterator<Range> internal;
-			int at;
-			int end;
-
-			public RangeNumberIterator(List<Range> ranges) {
-				if (ranges != null) {
-					internal = ranges.iterator();
-				}
-				at = -1;
-				end = -2;
-			}
-
-			@Override
-			public boolean hasNext() {
-				if (at <= end) {
-					return true;
-				} else if (internal != null){
-					return internal.hasNext();
-				}
-				return false;
-			}
-
-			@Override
-			public Integer next() {
-				if (at <= end) {
-					at++;
-					return at - 1;
-				} else if (internal != null){
-					Range found = internal.next();
-					if (found != null) {
-						at = found.start;
-						end = found.end;
-						at++;
-						return at - 1;
-					}
-				}
-				return null;
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		};
-
-		List<Range> ranges = new ArrayList<Range>();
-
-		public IntegerRanges() {
-		}
-
-		public IntegerRanges(String newValue) {
-			StringTokenizer itr = new StringTokenizer(newValue, ",");
-			while (itr.hasMoreTokens()) {
-				String rng = itr.nextToken().trim();
-				String[] parts = rng.split("-", 3);
-				if (parts.length < 1 || parts.length > 2) {
-					throw new IllegalArgumentException("integer range badly formed: " +
-						rng);
-				}
-				Range r = new Range();
-				r.start = convertToInt(parts[0], 0);
-				if (parts.length == 2) {
-					r.end = convertToInt(parts[1], Integer.MAX_VALUE);
-				} else {
-					r.end = r.start;
-				}
-				if (r.start > r.end) {
-					throw new IllegalArgumentException("IntegerRange from " + r.start +
-						" to " + r.end + " is invalid");
-				}
-				ranges.add(r);
-			}
-		}
-
-		/**
-		 * Convert a string to an int treating empty strings as the default value.
-		 * @param value the string value
-		 * @param defaultValue the value for if the string is empty
-		 * @return the desired integer
-		 */
-		private static int convertToInt(String value, int defaultValue) {
-			String trim = value.trim();
-			if (trim.length() == 0) {
-				return defaultValue;
-			}
-			return Integer.parseInt(trim);
-		}
-
-		/**
-		 * Is the given value in the set of ranges
-		 * @param value the value to check
-		 * @return is the value in the ranges?
-		 */
-		public boolean isIncluded(int value) {
-			for(Range r: ranges) {
-				if (r.start <= value && value <= r.end) {
-					return true;
-				}
-			}
-			return false;
-		}
-
-		/**
-		 * @return true if there are no values in this range, else false.
-		 */
-		public boolean isEmpty() {
-			return ranges == null || ranges.isEmpty();
-		}
-
-		@Override
-		public String toString() {
-			StringBuilder result = new StringBuilder();
-			boolean first = true;
-			for(Range r: ranges) {
-				if (first) {
-					first = false;
-				} else {
-					result.append(',');
-				}
-				result.append(r.start);
-				result.append('-');
-				result.append(r.end);
-			}
-			return result.toString();
-		}
-
-		@Override
-		public Iterator<Integer> iterator() {
-			return new RangeNumberIterator(ranges);
-		}
-
-	}
-
-	/**
-	 * Parse the given attribute as a set of integer ranges
-	 * @param name the attribute name
-	 * @param defaultValue the default value if it is not set
-	 * @return a new set of ranges from the configured value
-	 */
-	public IntegerRanges getRange(String name, String defaultValue) {
-		return new IntegerRanges(get(name, defaultValue));
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * a collection of <code>String</code>s.
-	 * If no such property is specified then empty collection is returned.
-	 * <p>
-	 * This is an optimized version of {@link #getStrings(String)}
-	 *
-	 * @param name property name.
-	 * @return property value as a collection of <code>String</code>s.
-	 */
-	public Collection<String> getStringCollection(String name) {
-		String valueString = get(name);
-		return StringUtils.getStringCollection(valueString);
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * an array of <code>String</code>s.
-	 * If no such property is specified then <code>null</code> is returned.
-	 *
-	 * @param name property name.
-	 * @return property value as an array of <code>String</code>s,
-	 *         or <code>null</code>.
-	 */
-	public String[] getStrings(String name) {
-		String valueString = get(name);
-		return StringUtils.getStrings(valueString);
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * an array of <code>String</code>s.
-	 * If no such property is specified then default value is returned.
-	 *
-	 * @param name property name.
-	 * @param defaultValue The default value
-	 * @return property value as an array of <code>String</code>s,
-	 *         or default value.
-	 */
-	public String[] getStrings(String name, String... defaultValue) {
-		String valueString = get(name);
-		if (valueString == null) {
-			return defaultValue;
-		} else {
-			return StringUtils.getStrings(valueString);
-		}
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * a collection of <code>String</code>s, trimmed of the leading and trailing whitespace.
-	 * If no such property is specified then empty <code>Collection</code> is returned.
-	 *
-	 * @param name property name.
-	 * @return property value as a collection of <code>String</code>s, or empty <code>Collection</code>
-	 */
-	public Collection<String> getTrimmedStringCollection(String name) {
-		String valueString = get(name);
-		if (null == valueString) {
-			Collection<String> empty = new ArrayList<String>();
-			return empty;
-		}
-		return StringUtils.getTrimmedStringCollection(valueString);
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
-	 * If no such property is specified then an empty array is returned.
-	 *
-	 * @param name property name.
-	 * @return property value as an array of trimmed <code>String</code>s,
-	 *         or empty array.
-	 */
-	public String[] getTrimmedStrings(String name) {
-		String valueString = get(name);
-		return StringUtils.getTrimmedStrings(valueString);
-	}
-
-	/**
-	 * Get the comma delimited values of the <code>name</code> property as
-	 * an array of <code>String</code>s, trimmed of the leading and trailing whitespace.
-	 * If no such property is specified then default value is returned.
-	 *
-	 * @param name property name.
-	 * @param defaultValue The default value
-	 * @return property value as an array of trimmed <code>String</code>s,
-	 *         or default value.
-	 */
-	public String[] getTrimmedStrings(String name, String... defaultValue) {
-		String valueString = get(name);
-		if (null == valueString) {
-			return defaultValue;
-		} else {
-			return StringUtils.getTrimmedStrings(valueString);
-		}
-	}
-
-	/**
-	 * Set the array of string values for the <code>name</code> property as
-	 * as comma delimited values.
-	 *
-	 * @param name property name.
-	 * @param values The values
-	 */
-	public void setStrings(String name, String... values) {
-		set(name, StringUtils.arrayToString(values));
-	}
-
-	/**
-	 * Get the value for a known password configuration element.
-	 * In order to enable the elimination of clear text passwords in config,
-	 * this method attempts to resolve the property name as an alias through
-	 * the CredentialProvider API and conditionally fallsback to config.
-	 * @param name property name
-	 * @return password
-	 */
-	public char[] getPassword(String name) throws IOException {
-		char[] pass = null;
-
-		pass = getPasswordFromCredentialProviders(name);
-
-		if (pass == null) {
-			pass = getPasswordFromConfig(name);
-		}
-
-		return pass;
-	}
-
-	/**
-	 * Try and resolve the provided element name as a credential provider
-	 * alias.
-	 * @param name alias of the provisioned credential
-	 * @return password or null if not found
-	 * @throws IOException
-	 */
-	protected char[] getPasswordFromCredentialProviders(String name)
-		throws IOException {
-		char[] pass = null;
-		try {
-			List<CredentialProvider> providers =
-				CredentialProviderFactory.getProviders(this);
-
-			if (providers != null) {
-				for (CredentialProvider provider : providers) {
-					try {
-						CredentialEntry entry = provider.getCredentialEntry(name);
-						if (entry != null) {
-							pass = entry.getCredential();
-							break;
-						}
-					}
-					catch (IOException ioe) {
-						throw new IOException("Can't get key " + name + " from key provider" +
-							"of type: " + provider.getClass().getName() + ".", ioe);
-					}
-				}
-			}
-		}
-		catch (IOException ioe) {
-			throw new IOException("Configuration problem with provider path.", ioe);
-		}
-
-		return pass;
-	}
-
-	/**
-	 * Fallback to clear text passwords in configuration.
-	 * @param name
-	 * @return clear text password or null
-	 */
-	protected char[] getPasswordFromConfig(String name) {
-		char[] pass = null;
-		if (getBoolean(CredentialProvider.CLEAR_TEXT_FALLBACK, true)) {
-			String passStr = get(name);
-			if (passStr != null) {
-				pass = passStr.toCharArray();
-			}
-		}
-		return pass;
-	}
-
-	/**
-	 * Get the socket address for <code>hostProperty</code> as a
-	 * <code>InetSocketAddress</code>. If <code>hostProperty</code> is
-	 * <code>null</code>, <code>addressProperty</code> will be used. This
-	 * is useful for cases where we want to differentiate between host
-	 * bind address and address clients should use to establish connection.
-	 *
-	 * @param hostProperty bind host property name.
-	 * @param addressProperty address property name.
-	 * @param defaultAddressValue the default value
-	 * @param defaultPort the default port
-	 * @return InetSocketAddress
-	 */
-	public InetSocketAddress getSocketAddr(
-		String hostProperty,
-		String addressProperty,
-		String defaultAddressValue,
-		int defaultPort) {
-
-		InetSocketAddress bindAddr = getSocketAddr(
-			addressProperty, defaultAddressValue, defaultPort);
-
-		final String host = get(hostProperty);
-
-		if (host == null || host.isEmpty()) {
-			return bindAddr;
-		}
-
-		return NetUtils.createSocketAddr(
-			host, bindAddr.getPort(), hostProperty);
-	}
-
-	/**
-	 * Get the socket address for <code>name</code> property as a
-	 * <code>InetSocketAddress</code>.
-	 * @param name property name.
-	 * @param defaultAddress the default value
-	 * @param defaultPort the default port
-	 * @return InetSocketAddress
-	 */
-	public InetSocketAddress getSocketAddr(
-		String name, String defaultAddress, int defaultPort) {
-		final String address = getTrimmed(name, defaultAddress);
-		return NetUtils.createSocketAddr(address, defaultPort, name);
-	}
-
-	/**
-	 * Set the socket address for the <code>name</code> property as
-	 * a <code>host:port</code>.
-	 */
-	public void setSocketAddr(String name, InetSocketAddress addr) {
-		set(name, NetUtils.getHostPortString(addr));
-	}
-
-	/**
-	 * Set the socket address a client can use to connect for the
-	 * <code>name</code> property as a <code>host:port</code>.  The wildcard
-	 * address is replaced with the local host's address. If the host and address
-	 * properties are configured the host component of the address will be combined
-	 * with the port component of the addr to generate the address.  This is to allow
-	 * optional control over which host name is used in multi-home bind-host
-	 * cases where a host can have multiple names
-	 * @param hostProperty the bind-host configuration name
-	 * @param addressProperty the service address configuration name
-	 * @param defaultAddressValue the service default address configuration value
-	 * @param addr InetSocketAddress of the service listener
-	 * @return InetSocketAddress for clients to connect
-	 */
-	public InetSocketAddress updateConnectAddr(
-		String hostProperty,
-		String addressProperty,
-		String defaultAddressValue,
-		InetSocketAddress addr) {
-
-		final String host = get(hostProperty);
-		final String connectHostPort = getTrimmed(addressProperty, defaultAddressValue);
-
-		if (host == null || host.isEmpty() || connectHostPort == null || connectHostPort.isEmpty()) {
-			//not our case, fall back to original logic
-			return updateConnectAddr(addressProperty, addr);
-		}
-
-		final String connectHost = connectHostPort.split(":")[0];
-		// Create connect address using client address hostname and server port.
-		return updateConnectAddr(addressProperty, NetUtils.createSocketAddrForHost(
-			connectHost, addr.getPort()));
-	}
-
-	/**
-	 * Set the socket address a client can use to connect for the
-	 * <code>name</code> property as a <code>host:port</code>.  The wildcard
-	 * address is replaced with the local host's address.
-	 * @param name property name.
-	 * @param addr InetSocketAddress of a listener to store in the given property
-	 * @return InetSocketAddress for clients to connect
-	 */
-	public InetSocketAddress updateConnectAddr(String name,
-		InetSocketAddress addr) {
-		final InetSocketAddress connectAddr = NetUtils.getConnectAddress(addr);
-		setSocketAddr(name, connectAddr);
-		return connectAddr;
-	}
-
-	/**
-	 * Load a class by name.
-	 *
-	 * @param name the class name.
-	 * @return the class object.
-	 * @throws ClassNotFoundException if the class is not found.
-	 */
-	public Class<?> getClassByName(String name) throws ClassNotFoundException {
-		Class<?> ret = getClassByNameOrNull(name);
-		if (ret == null) {
-			throw new ClassNotFoundException("Class " + name + " not found");
-		}
-		return ret;
-	}
-
-	/**
-	 * Load a class by name, returning null rather than throwing an exception
-	 * if it couldn't be loaded. This is to avoid the overhead of creating
-	 * an exception.
-	 *
-	 * @param name the class name
-	 * @return the class object, or null if it could not be found.
-	 */
-	public Class<?> getClassByNameOrNull(String name) {
-		Map<String, WeakReference<Class<?>>> map;
-
-		synchronized (CACHE_CLASSES) {
-			map = CACHE_CLASSES.get(classLoader);
-			if (map == null) {
-				map = Collections.synchronizedMap(
-					new WeakHashMap<String, WeakReference<Class<?>>>());
-				CACHE_CLASSES.put(classLoader, map);
-			}
-		}
-
-		Class<?> clazz = null;
-		WeakReference<Class<?>> ref = map.get(name);
-		if (ref != null) {
-			clazz = ref.get();
-		}
-
-		if (clazz == null) {
-			try {
-				clazz = Class.forName(name, true, classLoader);
-			} catch (ClassNotFoundException e) {
-				// Leave a marker that the class isn't found
-				map.put(name, new WeakReference<Class<?>>(NEGATIVE_CACHE_SENTINEL));
-				return null;
-			}
-			// two putters can race here, but they'll put the same class
-			map.put(name, new WeakReference<Class<?>>(clazz));
-			return clazz;
-		} else if (clazz == NEGATIVE_CACHE_SENTINEL) {
-			return null; // not found
-		} else {
-			// cache hit
-			return clazz;
-		}
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property
-	 * as an array of <code>Class</code>.
-	 * The value of the property specifies a list of comma separated class names.
-	 * If no such property is specified, then <code>defaultValue</code> is
-	 * returned.
-	 *
-	 * @param name the property name.
-	 * @param defaultValue default value.
-	 * @return property value as a <code>Class[]</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public Class<?>[] getClasses(String name, Class<?> ... defaultValue) {
-		String[] classnames = getTrimmedStrings(name);
-		if (classnames == null)
-			return defaultValue;
-		try {
-			Class<?>[] classes = new Class<?>[classnames.length];
-			for(int i = 0; i < classnames.length; i++) {
-				classes[i] = getClassByName(classnames[i]);
-			}
-			return classes;
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>Class</code>.
-	 * If no such property is specified, then <code>defaultValue</code> is
-	 * returned.
-	 *
-	 * @param name the class name.
-	 * @param defaultValue default value.
-	 * @return property value as a <code>Class</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public Class<?> getClass(String name, Class<?> defaultValue) {
-		String valueString = getTrimmed(name);
-		if (valueString == null)
-			return defaultValue;
-		try {
-			return getClassByName(valueString);
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>Class</code>
-	 * implementing the interface specified by <code>xface</code>.
-	 *
-	 * If no such property is specified, then <code>defaultValue</code> is
-	 * returned.
-	 *
-	 * An exception is thrown if the returned class does not implement the named
-	 * interface.
-	 *
-	 * @param name the class name.
-	 * @param defaultValue default value.
-	 * @param xface the interface implemented by the named class.
-	 * @return property value as a <code>Class</code>,
-	 *         or <code>defaultValue</code>.
-	 */
-	public <U> Class<? extends U> getClass(String name,
-		Class<? extends U> defaultValue,
-		Class<U> xface) {
-		try {
-			Class<?> theClass = getClass(name, defaultValue);
-			if (theClass != null && !xface.isAssignableFrom(theClass))
-				throw new RuntimeException(theClass+" not "+xface.getName());
-			else if (theClass != null)
-				return theClass.asSubclass(xface);
-			else
-				return null;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Get the value of the <code>name</code> property as a <code>List</code>
-	 * of objects implementing the interface specified by <code>xface</code>.
-	 *
-	 * An exception is thrown if any of the classes does not exist, or if it does
-	 * not implement the named interface.
-	 *
-	 * @param name the property name.
-	 * @param xface the interface implemented by the classes named by
-	 *        <code>name</code>.
-	 * @return a <code>List</code> of objects implementing <code>xface</code>.
-	 */
-	@SuppressWarnings("unchecked")
-	public <U> List<U> getInstances(String name, Class<U> xface) {
-		List<U> ret = new ArrayList<U>();
-		Class<?>[] classes = getClasses(name);
-		for (Class<?> cl: classes) {
-			if (!xface.isAssignableFrom(cl)) {
-				throw new RuntimeException(cl + " does not implement " + xface);
-			}
-			ret.add((U)ReflectionUtils.newInstance(cl, this));
-		}
-		return ret;
-	}
-
-	/**
-	 * Set the value of the <code>name</code> property to the name of a
-	 * <code>theClass</code> implementing the given interface <code>xface</code>.
-	 *
-	 * An exception is thrown if <code>theClass</code> does not implement the
-	 * interface <code>xface</code>.
-	 *
-	 * @param name property name.
-	 * @param theClass property value.
-	 * @param xface the interface implemented by the named class.
-	 */
-	public void setClass(String name, Class<?> theClass, Class<?> xface) {
-		if (!xface.isAssignableFrom(theClass))
-			throw new RuntimeException(theClass+" not "+xface.getName());
-		set(name, theClass.getName());
-	}
-
-	/**
-	 * Get a local file under a directory named by <i>dirsProp</i> with
-	 * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
-	 * then one is chosen based on <i>path</i>'s hash code.  If the selected
-	 * directory does not exist, an attempt is made to create it.
-	 *
-	 * @param dirsProp directory in which to locate the file.
-	 * @param path file-path.
-	 * @return local file under the directory with the given path.
-	 */
-	public Path getLocalPath(String dirsProp, String path)
-		throws IOException {
-		String[] dirs = getTrimmedStrings(dirsProp);
-		int hashCode = path.hashCode();
-		FileSystem fs = FileSystem.getLocal(this);
-		for (int i = 0; i < dirs.length; i++) {  // try each local dir
-			int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-			Path file = new Path(dirs[index], path);
-			Path dir = file.getParent();
-			if (fs.mkdirs(dir) || fs.exists(dir)) {
-				return file;
-			}
-		}
-		LOG.warn("Could not make " + path +
-			" in local directories from " + dirsProp);
-		for(int i=0; i < dirs.length; i++) {
-			int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-			LOG.warn(dirsProp + "[" + index + "]=" + dirs[index]);
-		}
-		throw new IOException("No valid local directories in property: "+dirsProp);
-	}
-
-	/**
-	 * Get a local file name under a directory named in <i>dirsProp</i> with
-	 * the given <i>path</i>.  If <i>dirsProp</i> contains multiple directories,
-	 * then one is chosen based on <i>path</i>'s hash code.  If the selected
-	 * directory does not exist, an attempt is made to create it.
-	 *
-	 * @param dirsProp directory in which to locate the file.
-	 * @param path file-path.
-	 * @return local file under the directory with the given path.
-	 */
-	public File getFile(String dirsProp, String path)
-		throws IOException {
-		String[] dirs = getTrimmedStrings(dirsProp);
-		int hashCode = path.hashCode();
-		for (int i = 0; i < dirs.length; i++) {  // try each local dir
-			int index = (hashCode+i & Integer.MAX_VALUE) % dirs.length;
-			File file = new File(dirs[index], path);
-			File dir = file.getParentFile();
-			if (dir.exists() || dir.mkdirs()) {
-				return file;
-			}
-		}
-		throw new IOException("No valid local directories in property: "+dirsProp);
-	}
-
-	/**
-	 * Get the {@link URL} for the named resource.
-	 *
-	 * @param name resource name.
-	 * @return the url for the named resource.
-	 */
-	public URL getResource(String name) {
-		return classLoader.getResource(name);
-	}
-
-	/**
-	 * Get an input stream attached to the configuration resource with the
-	 * given <code>name</code>.
-	 *
-	 * @param name configuration resource name.
-	 * @return an input stream attached to the resource.
-	 */
-	public InputStream getConfResourceAsInputStream(String name) {
-		try {
-			URL url= getResource(name);
-
-			if (url == null) {
-				LOG.info(name + " not found");
-				return null;
-			} else {
-				LOG.info("found resource " + name + " at " + url);
-			}
-
-			return url.openStream();
-		} catch (Exception e) {
-			return null;
-		}
-	}
-
-	/**
-	 * Get a {@link Reader} attached to the configuration resource with the
-	 * given <code>name</code>.
-	 *
-	 * @param name configuration resource name.
-	 * @return a reader attached to the resource.
-	 */
-	public Reader getConfResourceAsReader(String name) {
-		try {
-			URL url= getResource(name);
-
-			if (url == null) {
-				LOG.info(name + " not found");
-				return null;
-			} else {
-				LOG.info("found resource " + name + " at " + url);
-			}
-
-			return new InputStreamReader(url.openStream(), Charsets.UTF_8);
-		} catch (Exception e) {
-			return null;
-		}
-	}
-
-	/**
-	 * Get the set of parameters marked final.
-	 *
-	 * @return final parameter set.
-	 */
-	public Set<String> getFinalParameters() {
-		Set<String> setFinalParams = Collections.newSetFromMap(
-			new ConcurrentHashMap<String, Boolean>());
-		setFinalParams.addAll(finalParameters);
-		return setFinalParams;
-	}
-
-	protected synchronized Properties getProps() {
-		if (properties == null) {
-			properties = new Properties();
-			Map<String, String[]> backup =
-				new ConcurrentHashMap<String, String[]>(updatingResource);
-			loadResources(properties, resources, quietmode);
-
-			if (overlay != null) {
-				properties.putAll(overlay);
-				for (Map.Entry<Object,Object> item: overlay.entrySet()) {
-					String key = (String)item.getKey();
-					String[] source = backup.get(key);
-					if(source != null) {
-						updatingResource.put(key, source);
-					}
-				}
-			}
-		}
-		return properties;
-	}
-
-	/**
-	 * Return the number of keys in the configuration.
-	 *
-	 * @return number of keys in the configuration.
-	 */
-	public int size() {
-		return getProps().size();
-	}
-
-	/**
-	 * Clears all keys from the configuration.
-	 */
-	public void clear() {
-		getProps().clear();
-		getOverlay().clear();
-	}
-
-	/**
-	 * Get an {@link Iterator} to go through the list of <code>String</code>
-	 * key-value pairs in the configuration.
-	 *
-	 * @return an iterator over the entries.
-	 */
-	@Override
-	public Iterator<Map.Entry<String, String>> iterator() {
-		// Get a copy of just the string to string pairs. After the old object
-		// methods that allow non-strings to be put into configurations are removed,
-		// we could replace properties with a Map<String,String> and get rid of this
-		// code.
-		Map<String,String> result = new HashMap<String,String>();
-		for(Map.Entry<Object,Object> item: getProps().entrySet()) {
-			if (item.getKey() instanceof String &&
-				item.getValue() instanceof String) {
-				result.put((String) item.getKey(), (String) item.getValue());
-			}
-		}
-		return result.entrySet().iterator();
-	}
-
-	private Document parse(DocumentBuilder builder, URL url)
-		throws IOException, SAXException {
-		if (!quietmode) {
-			LOG.debug("parsing URL " + url);
-		}
-		if (url == null) {
-			return null;
-		}
-		return parse(builder, url.openStream(), url.toString());
-	}
-
-	private Document parse(DocumentBuilder builder, InputStream is,
-		String systemId) throws IOException, SAXException {
-		if (!quietmode) {
-			LOG.debug("parsing input stream " + is);
-		}
-		if (is == null) {
-			return null;
-		}
-		try {
-			return (systemId == null) ? builder.parse(is) : builder.parse(is,
-				systemId);
-		} finally {
-			is.close();
-		}
-	}
-
-	private void loadResources(Properties properties,
-		ArrayList<Resource> resources,
-		boolean quiet) {
-		if(loadDefaults) {
-			for (String resource : defaultResources) {
-				loadResource(properties, new Resource(resource), quiet);
-			}
-
-			//support the hadoop-site.xml as a deprecated case
-			if(getResource("hadoop-site.xml")!=null) {
-				loadResource(properties, new Resource("hadoop-site.xml"), quiet);
-			}
-		}
-
-		for (int i = 0; i < resources.size(); i++) {
-			Resource ret = loadResource(properties, resources.get(i), quiet);
-			if (ret != null) {
-				resources.set(i, ret);
-			}
-		}
-	}
-
-	private Resource loadResource(Properties properties, Resource wrapper, boolean quiet) {
-		String name = UNKNOWN_RESOURCE;
-		try {
-			Object resource = wrapper.getResource();
-			name = wrapper.getName();
-
-			DocumentBuilderFactory docBuilderFactory
-				= DocumentBuilderFactory.newInstance();
-			//ignore all comments inside the xml file
-			docBuilderFactory.setIgnoringComments(true);
-
-			//allow includes in the xml file
-			docBuilderFactory.setNamespaceAware(true);
-			try {
-				docBuilderFactory.setXIncludeAware(true);
-			} catch (UnsupportedOperationException e) {
-				LOG.error("Failed to set setXIncludeAware(true) for parser "
-						+ docBuilderFactory
-						+ ":" + e,
-					e);
-			}
-			DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-			Document doc = null;
-			Element root = null;
-			boolean returnCachedProperties = false;
-
-			if (resource instanceof URL) {                  // an URL resource
-				doc = parse(builder, (URL)resource);
-			} else if (resource instanceof String) {        // a CLASSPATH resource
-				URL url = getResource((String)resource);
-				doc = parse(builder, url);
-			} else if (resource instanceof Path) {          // a file resource
-				// Can't use FileSystem API or we get an infinite loop
-				// since FileSystem uses Configuration API.  Use java.io.File instead.
-				File file = new File(((Path)resource).toUri().getPath())
-					.getAbsoluteFile();
-				if (file.exists()) {
-					if (!quiet) {
-						LOG.debug("parsing File " + file);
-					}
-					doc = parse(builder, new BufferedInputStream(
-						new FileInputStream(file)), ((Path)resource).toString());
-				}
-			} else if (resource instanceof InputStream) {
-				doc = parse(builder, (InputStream) resource, null);
-				returnCachedProperties = true;
-			} else if (resource instanceof Properties) {
-				overlay(properties, (Properties)resource);
-			} else if (resource instanceof Element) {
-				root = (Element)resource;
-			}
-
-			if (root == null) {
-				if (doc == null) {
-					if (quiet) {
-						return null;
-					}
-					throw new RuntimeException(resource + " not found");
-				}
-				root = doc.getDocumentElement();
-			}
-			Properties toAddTo = properties;
-			if(returnCachedProperties) {
-				toAddTo = new Properties();
-			}
-			if (!"configuration".equals(root.getTagName()))
-				LOG.fatal("bad conf file: top-level element not <configuration>");
-			NodeList props = root.getChildNodes();
-			DeprecationContext deprecations = deprecationContext.get();
-			for (int i = 0; i < props.getLength(); i++) {
-				Node propNode = props.item(i);
-				if (!(propNode instanceof Element))
-					continue;
-				Element prop = (Element)propNode;
-				if ("configuration".equals(prop.getTagName())) {
-					loadResource(toAddTo, new Resource(prop, name), quiet);
-					continue;
-				}
-				if (!"property".equals(prop.getTagName()))
-					LOG.warn("bad conf file: element not <property>");
-				NodeList fields = prop.getChildNodes();
-				String attr = null;
-				String value = null;
-				boolean finalParameter = false;
-				LinkedList<String> source = new LinkedList<String>();
-				for (int j = 0; j < fields.getLength(); j++) {
-					Node fieldNode = fields.item(j);
-					if (!(fieldNode instanceof Element))
-						continue;
-					Element field = (Element)fieldNode;
-					if ("name".equals(field.getTagName()) && field.hasChildNodes())
-						attr = StringInterner.weakIntern(
-							((Text)field.getFirstChild()).getData().trim());
-					if ("value".equals(field.getTagName()) && field.hasChildNodes())
-						value = StringInterner.weakIntern(
-							((Text)field.getFirstChild()).getData());
-					if ("final".equals(field.getTagName()) && field.hasChildNodes())
-						finalParameter = "true".equals(((Text)field.getFirstChild()).getData());
-					if ("source".equals(field.getTagName()) && field.hasChildNodes())
-						source.add(StringInterner.weakIntern(
-							((Text)field.getFirstChild()).getData()));
-				}
-				source.add(name);
-
-				// Ignore this parameter if it has already been marked as 'final'
-				if (attr != null) {
-					if (deprecations.getDeprecatedKeyMap().containsKey(attr)) {
-						DeprecatedKeyInfo keyInfo =
-							deprecations.getDeprecatedKeyMap().get(attr);
-						keyInfo.clearAccessed();
-						for (String key:keyInfo.newKeys) {
-							// update new keys with deprecated key's value
-							loadProperty(toAddTo, name, key, value, finalParameter,
-								source.toArray(new String[source.size()]));
-						}
-					}
-					else {
-						loadProperty(toAddTo, name, attr, value, finalParameter,
-							source.toArray(new String[source.size()]));
-					}
-				}
-			}
-
-			if (returnCachedProperties) {
-				overlay(properties, toAddTo);
-				return new Resource(toAddTo, name);
-			}
-			return null;
-		} catch (IOException e) {
-			LOG.fatal("error parsing conf " + name, e);
-			throw new RuntimeException(e);
-		} catch (DOMException e) {
-			LOG.fatal("error parsing conf " + name, e);
-			throw new RuntimeException(e);
-		} catch (SAXException e) {
-			LOG.fatal("error parsing conf " + name, e);
-			throw new RuntimeException(e);
-		} catch (ParserConfigurationException e) {
-			LOG.fatal("error parsing conf " + name , e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	private void overlay(Properties to, Properties from) {
-		for (Entry<Object, Object> entry: from.entrySet()) {
-			to.put(entry.getKey(), entry.getValue());
-		}
-	}
-
-	private void loadProperty(Properties properties, String name, String attr,
-		String value, boolean finalParameter, String[] source) {
-		if (value != null || allowNullValueProperties) {
-			if (!finalParameters.contains(attr)) {
-				if (value==null && allowNullValueProperties) {
-					value = DEFAULT_STRING_CHECK;
-				}
-				properties.setProperty(attr, value);
-				if(source != null) {
-					updatingResource.put(attr, source);
-				}
-			} else if (!value.equals(properties.getProperty(attr))) {
-				LOG.warn(name+":an attempt to override final parameter: "+attr
-					+";  Ignoring.");
-			}
-		}
-		if (finalParameter && attr != null) {
-			finalParameters.add(attr);
-		}
-	}
-
-	/**
-	 * Write out the non-default properties in this configuration to the given
-	 * {@link OutputStream} using UTF-8 encoding.
-	 *
-	 * @param out the output stream to write to.
-	 */
-	public void writeXml(OutputStream out) throws IOException {
-		writeXml(new OutputStreamWriter(out, "UTF-8"));
-	}
-
-	/**
-	 * Write out the non-default properties in this configuration to the given
-	 * {@link Writer}.
-	 *
-	 * @param out the writer to write to.
-	 */
-	public void writeXml(Writer out) throws IOException {
-		Document doc = asXmlDocument();
-
-		try {
-			DOMSource source = new DOMSource(doc);
-			StreamResult result = new StreamResult(out);
-			TransformerFactory transFactory = TransformerFactory.newInstance();
-			Transformer transformer = transFactory.newTransformer();
-
-			// Important to not hold Configuration log while writing result, since
-			// 'out' may be an HDFS stream which needs to lock this configuration
-			// from another thread.
-			transformer.transform(source, result);
-		} catch (TransformerException te) {
-			throw new IOException(te);
-		}
-	}
-
-	/**
-	 * Return the XML DOM corresponding to this Configuration.
-	 */
-	private synchronized Document asXmlDocument() throws IOException {
-		Document doc;
-		try {
-			doc =
-				DocumentBuilderFactory.newInstance().newDocumentBuilder().newDocument();
-		} catch (ParserConfigurationException pe) {
-			throw new IOException(pe);
-		}
-		Element conf = doc.createElement("configuration");
-		doc.appendChild(conf);
-		conf.appendChild(doc.createTextNode("\n"));
-		handleDeprecation(); //ensure properties is set and deprecation is handled
-		for (Enumeration<Object> e = properties.keys(); e.hasMoreElements();) {
-			String name = (String)e.nextElement();
-			Object object = properties.get(name);
-			String value = null;
-			if (object instanceof String) {
-				value = (String) object;
-			}else {
-				continue;
-			}
-			Element propNode = doc.createElement("property");
-			conf.appendChild(propNode);
-
-			Element nameNode = doc.createElement("name");
-			nameNode.appendChild(doc.createTextNode(name));
-			propNode.appendChild(nameNode);
-
-			Element valueNode = doc.createElement("value");
-			valueNode.appendChild(doc.createTextNode(value));
-			propNode.appendChild(valueNode);
-
-			if (updatingResource != null) {
-				String[] sources = updatingResource.get(name);
-				if(sources != null) {
-					for(String s : sources) {
-						Element sourceNode = doc.createElement("source");
-						sourceNode.appendChild(doc.createTextNode(s));
-						propNode.appendChild(sourceNode);
-					}
-				}
-			}
-
-			conf.appendChild(doc.createTextNode("\n"));
-		}
-		return doc;
-	}
-
-	/**
-	 *  Writes out all the parameters and their properties (final and resource) to
-	 *  the given {@link Writer}
-	 *  The format of the output would be
-	 *  { "properties" : [ {key1,value1,key1.isFinal,key1.resource}, {key2,value2,
-	 *  key2.isFinal,key2.resource}... ] }
-	 *  It does not output the parameters of the configuration object which is
-	 *  loaded from an input stream.
-	 * @param out the Writer to write to
-	 * @throws IOException
-	 */
-	public static void dumpConfiguration(Configuration config,
-		Writer out) throws IOException {
-		JsonFactory dumpFactory = new JsonFactory();
-		JsonGenerator dumpGenerator = dumpFactory.createJsonGenerator(out);
-		dumpGenerator.writeStartObject();
-		dumpGenerator.writeFieldName("properties");
-		dumpGenerator.writeStartArray();
-		dumpGenerator.flush();
-		synchronized (config) {
-			for (Map.Entry<Object,Object> item: config.getProps().entrySet()) {
-				dumpGenerator.writeStartObject();
-				dumpGenerator.writeStringField("key", (String) item.getKey());
-				dumpGenerator.writeStringField("value",
-					config.get((String) item.getKey()));
-				dumpGenerator.writeBooleanField("isFinal",
-					config.finalParameters.contains(item.getKey()));
-				String[] resources = config.updatingResource.get(item.getKey());
-				String resource = UNKNOWN_RESOURCE;
-				if(resources != null && resources.length > 0) {
-					resource = resources[0];
-				}
-				dumpGenerator.writeStringField("resource", resource);
-				dumpGenerator.writeEndObject();
-			}
-		}
-		dumpGenerator.writeEndArray();
-		dumpGenerator.writeEndObject();
-		dumpGenerator.flush();
-	}
-
-	/**
-	 * Get the {@link ClassLoader} for this job.
-	 *
-	 * @return the correct class loader.
-	 */
-	public ClassLoader getClassLoader() {
-		return classLoader;
-	}
-
-	/**
-	 * Set the class loader that will be used to load the various objects.
-	 *
-	 * @param classLoader the new class loader.
-	 */
-	public void setClassLoader(ClassLoader classLoader) {
-		this.classLoader = classLoader;
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append("Configuration: ");
-		if(loadDefaults) {
-			toString(defaultResources, sb);
-			if(resources.size()>0) {
-				sb.append(", ");
-			}
-		}
-		toString(resources, sb);
-		return sb.toString();
-	}
-
-	private <T> void toString(List<T> resources, StringBuilder sb) {
-		ListIterator<T> i = resources.listIterator();
-		while (i.hasNext()) {
-			if (i.nextIndex() != 0) {
-				sb.append(", ");
-			}
-			sb.append(i.next());
-		}
-	}
-
-	/**
-	 * Set the quietness-mode.
-	 *
-	 * In the quiet-mode, error and informational messages might not be logged.
-	 *
-	 * @param quietmode <code>true</code> to set quiet-mode on, <code>false</code>
-	 *              to turn it off.
-	 */
-	public synchronized void setQuietMode(boolean quietmode) {
-		this.quietmode = quietmode;
-	}
-
-	synchronized boolean getQuietMode() {
-		return this.quietmode;
-	}
-
-	/** For debugging.  List non-default properties to the terminal and exit. */
-	public static void main(String[] args) throws Exception {
-		new Configuration().writeXml(System.out);
-	}
-
-	@Override
-	public void readFields(DataInput in) throws IOException {
-		clear();
-		int size = WritableUtils.readVInt(in);
-		for(int i=0; i < size; ++i) {
-			String key = org.apache.hadoop.io.Text.readString(in);
-			String value = org.apache.hadoop.io.Text.readString(in);
-			set(key, value);
-			String sources[] = WritableUtils.readCompressedStringArray(in);
-			if(sources != null) {
-				updatingResource.put(key, sources);
-			}
-		}
-	}
-
-	//@Override
-	@Override
-	public void write(DataOutput out) throws IOException {
-		Properties props = getProps();
-		WritableUtils.writeVInt(out, props.size());
-		for(Map.Entry<Object, Object> item: props.entrySet()) {
-			org.apache.hadoop.io.Text.writeString(out, (String) item.getKey());
-			org.apache.hadoop.io.Text.writeString(out, (String) item.getValue());
-			WritableUtils.writeCompressedStringArray(out,
-				updatingResource.get(item.getKey()));
-		}
-	}
-
-	/**
-	 * get keys matching the the regex
-	 * @param regex
-	 * @return Map<String,String> with matching keys
-	 */
-	public Map<String,String> getValByRegex(String regex) {
-		Pattern p = Pattern.compile(regex);
-
-		Map<String,String> result = new HashMap<String,String>();
-		Matcher m;
-
-		for(Map.Entry<Object,Object> item: getProps().entrySet()) {
-			if (item.getKey() instanceof String &&
-				item.getValue() instanceof String) {
-				m = p.matcher((String)item.getKey());
-				if(m.find()) { // match
-					result.put((String) item.getKey(),
-						substituteVars(getProps().getProperty((String) item.getKey())));
-				}
-			}
-		}
-		return result;
-	}
-
-	/**
-	 * A unique class which is used as a sentinel value in the caching
-	 * for getClassByName. {@see Configuration#getClassByNameOrNull(String)}
-	 */
-	private static abstract class NegativeCacheSentinel {}
-
-	public static void dumpDeprecatedKeys() {
-		DeprecationContext deprecations = deprecationContext.get();
-		for (Map.Entry<String, DeprecatedKeyInfo> entry :
-			deprecations.getDeprecatedKeyMap().entrySet()) {
-			StringBuilder newKeys = new StringBuilder();
-			for (String newKey : entry.getValue().newKeys) {
-				newKeys.append(newKey).append("\t");
-			}
-			System.out.println(entry.getKey() + "\t" + newKeys.toString());
-		}
-	}
-
-	/**
-	 * Returns whether or not a deprecated name has been warned. If the name is not
-	 * deprecated then always return false
-	 */
-	public static boolean hasWarnedDeprecation(String name) {
-		DeprecationContext deprecations = deprecationContext.get();
-		if(deprecations.getDeprecatedKeyMap().containsKey(name)) {
-			if(deprecations.getDeprecatedKeyMap().get(name).accessed.get()) {
-				return true;
-			}
-		}
-		return false;
-	}
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
deleted file mode 100644
index cbcb756..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/hadoop/util/NativeCodeLoader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.util;
-
-import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.Log;
-import com.facebook.presto.hadoop.$internal.org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
-/**
- * A helper to load the native hadoop code i.e. libhadoop.so.
- * This handles the fallback to either the bundled libhadoop-Linux-i386-32.so
- * or the default java implementations where appropriate.
- *  
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class NativeCodeLoader {
-
-  private static final Log LOG =
-    LogFactory.getLog(NativeCodeLoader.class);
-  
-  private static boolean nativeCodeLoaded = false;
-  
-  static {
-    LOG.info("Skipping native-hadoop library for flink-s3-fs-presto's relocated Hadoop... " +
-             "using builtin-java classes where applicable");
-  }
-
-  /**
-   * Check if native-hadoop code is loaded for this platform.
-   * 
-   * @return <code>true</code> if native-hadoop is loaded, 
-   *         else <code>false</code>
-   */
-  public static boolean isNativeCodeLoaded() {
-    return nativeCodeLoaded;
-  }
-
-  /**
-   * Returns true only if this build was compiled with support for snappy.
-   */
-  public static native boolean buildSupportsSnappy();
-  
-  /**
-   * Returns true only if this build was compiled with support for openssl.
-   */
-  public static native boolean buildSupportsOpenssl();
-
-  public static native String getLibraryName();
-
-  /**
-   * Return if native hadoop libraries, if present, can be used for this job.
-   * @param conf configuration
-   * 
-   * @return <code>true</code> if native hadoop libraries, if present, can be 
-   *         used for this job; <code>false</code> otherwise.
-   */
-  public boolean getLoadNativeLibraries(Configuration conf) {
-    return conf.getBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY, 
-                           CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_DEFAULT);
-  }
-  
-  /**
-   * Set if native hadoop libraries, if present, can be used for this job.
-   * 
-   * @param conf configuration
-   * @param loadNativeLibraries can native hadoop libraries be loaded
-   */
-  public void setLoadNativeLibraries(Configuration conf, 
-                                     boolean loadNativeLibraries) {
-    conf.setBoolean(CommonConfigurationKeys.IO_NATIVE_LIB_AVAILABLE_KEY,
-                    loadNativeLibraries);
-  }
-
-}
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE b/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index 45b446b..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,82 +0,0 @@
-This project includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
--------------------------------------------------------------
-
-This project bundles the following dependencies under
-the Apache Software License 2.0
-
-  - com.facebook.presto : presto-hive version 0.185
-  - com.facebook.presto.hadoop : hadoop-apache2 version 2.7.3-1
-  - com.amazonaws : aws-java-sdk-core version 1.11.165
-  - com.amazonaws : aws-java-sdk-s3 version 1.11.165
-  - com.amazonaws : aws-java-sdk-kms version 1.11.165
-  - com.amazonaws : jmespath-java version 1.11.165
-  - software.amazon.ion : ion-java version 1.0.2
-  - io.airlift : stats version 0.148
-  - io.airlift : log version 0.148
-  - io.airlift : configuration version 0.148
-  - io.airlift : slice version 0.31
-  - io.airlift : units version 1.0
-  - com.google.guava : guava version 21.0
-  - com.google.code.findbugs : annotations version 2.0.3
-  - org.weakref : jmxutils version 1.19
-  - joda-time : joda-time version 2.5
-  - commons-logging : commons-logging version 1.1.3
-  - org.apache.httpcomponents : httpclient version 4.5.3
-  - org.apache.httpcomponents : httpcore version 4.4.6
-  - commons-codec : commons-codec version 1.10
-  - com.fasterxml.jackson.core : jackson-core version 2.7.4
-  - com.fasterxml.jackson.core : jackson-databind version 2.7.4
-  - com.fasterxml.jackson.core : jackson-annotations version 2.7.4
-  - com.fasterxml.jackson.dataformat : jackson-dataformat-cbor version 2.6.7  
-
-===================================
-       Notice for HdrHistogram
-===================================
-
-This project bundles HdrHistogram (v. 2.1.9) under the BSD 2-Clause License 
-
-Original source repository: https://github.com/HdrHistogram/HdrHistogram
-
-The code in this repository code was Written by Gil Tene, Michael Barker,
-and Matt Warren, and released to the public domain, as explained at
-http://creativecommons.org/publicdomain/zero/1.0/
-
-For users of this code who wish to consume it under the "BSD" license
-rather than under the public domain or CC0 contribution text mentioned
-above, the code found under this directory is *also* provided under the
-following license (commonly referred to as the BSD 2-Clause License). This
-license does not detract from the above stated release of the code into
-the public domain, and simply represents an additional license granted by
-the Author.
-
------------------------------------------------------------------------------
-** Beginning of "BSD 2-Clause License" text. **
-
- Copyright (c) 2012, 2013, 2014, 2015, 2016 Gil Tene
- Copyright (c) 2014 Michael Barker
- Copyright (c) 2014 Matt Warren
- All rights reserved.
-
- Redistribution and use in source and binary forms, with or without
- modification, are permitted provided that the following conditions are met:
-
- 1. Redistributions of source code must retain the above copyright notice,
-    this list of conditions and the following disclaimer.
-
- 2. Redistributions in binary form must reproduce the above copyright notice,
-    this list of conditions and the following disclaimer in the documentation
-    and/or other materials provided with the distribution.
-
- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
- LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
- CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
- SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
- CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
- ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
- THE POSSIBILITY OF SUCH DAMAGE.
\ No newline at end of file
diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/resources/core-default-shaded.xml b/flink-filesystems/flink-s3-fs-presto/src/main/resources/core-default-shaded.xml
deleted file mode 100644
index cfb2ab2..0000000
--- a/flink-filesystems/flink-s3-fs-presto/src/main/resources/core-default-shaded.xml
+++ /dev/null
@@ -1,1978 +0,0 @@
-<?xml version="1.0"?>
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<!-- Do not modify this file directly.  Instead, copy entries that you -->
-<!-- wish to modify from this file into core-site.xml and change them -->
-<!-- there.  If core-site.xml does not already exist, create it.      -->
-
-<configuration>
-
-<!--- global properties -->
-
-<property>
-  <name>hadoop.common.configuration.version</name>
-  <value>0.23.0</value>
-  <description>version of this configuration file</description>
-</property>
-
-<property>
-  <name>hadoop.tmp.dir</name>
-  <value>/tmp/hadoop-${user.name}</value>
-  <description>A base for other temporary directories.</description>
-</property>
-
-<property>
-  <name>io.native.lib.available</name>
-  <value>true</value>
-  <description>Controls whether to use native libraries for bz2 and zlib
-    compression codecs or not. The property does not control any other native
-    libraries.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.filter.initializers</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.http.lib.StaticUserWebFilter</value>
-  <description>A comma separated list of class names. Each class in the list 
-  must extend org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.http.FilterInitializer. The corresponding 
-  Filter will be initialized. Then, the Filter will be applied to all user 
-  facing jsp and servlet web pages.  The ordering of the list defines the 
-  ordering of the filters.</description>
-</property>
-
-<!--- security properties -->
-
-<property>
-  <name>hadoop.security.authorization</name>
-  <value>false</value>
-  <description>Is service-level authorization enabled?</description>
-</property>
-
-<property>
-  <name>hadoop.security.instrumentation.requires.admin</name>
-  <value>false</value>
-  <description>
-    Indicates if administrator ACLs are required to access
-    instrumentation servlets (JMX, METRICS, CONF, STACKS).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.authentication</name>
-  <value>simple</value>
-  <description>Possible values are simple (no authentication), and kerberos
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback</value>
-  <description>
-    Class for user to group mapping (get groups for a given user) for ACL. 
-    The default implementation,
-    org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback, 
-    will determine if the Java Native Interface (JNI) is available. If JNI is 
-    available the implementation will use the API within hadoop to resolve a 
-    list of groups for a user. If JNI is not available then the shell 
-    implementation, ShellBasedUnixGroupsMapping, is used.  This implementation 
-    shells out to the Linux/Unix environment with the 
-    <code>bash -c groups</code> command to resolve a list of groups for a user.
-  </description>
-</property>
-
-<!-- 
-=== Multiple group mapping providers configuration sample === 
-  This sample illustrates a typical use case for CompositeGroupsMapping where
-Hadoop authentication uses MIT Kerberos which trusts an AD realm. In this case, service
-principals such as hdfs, mapred, hbase, hive, oozie and etc can be placed in In MIT Kerberos,
-but end users are just from the trusted AD. For the service principals, ShellBasedUnixGroupsMapping
-provider can be used to query their groups for efficiency, and for end users, LdapGroupsMapping 
-provider can be used. This avoids to add group entries in AD for service principals when only using 
-LdapGroupsMapping provider.
-  In case multiple ADs are involved and trusted by the MIT Kerberos in this use case, LdapGroupsMapping
-provider can be used more times with different AD specific configurations. This sample also shows how
-to do that. Here are the necessary configurations.
-
-<property>
-  <name>hadoop.security.group.mapping</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.CompositeGroupsMapping</value>
-  <description>
-    Class for user to group mapping (get groups for a given user) for ACL, which 
-    makes use of other multiple providers to provide the service.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers</name>
-  <value>shell4services,ad4usersX,ad4usersY</value>
-  <description>
-    Comma separated of names of other providers to provide user to group mapping. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.providers.combined</name>
-  <value>true</value>
-  <description>
-    true or false to indicate whether groups from the providers are combined or not. The default value is true
-    If true, then all the providers will be tried to get groups and all the groups are combined to return as
-    the final results. Otherwise, providers are tried one by one in the configured list order, and if any
-    groups are retrieved from any provider, then the groups will be returned without trying the left ones.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.provider.shell4services</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.ShellBasedUnixGroupsMapping</value>
-  <description>
-    Class for group mapping provider named by 'shell4services'. The name can then be referenced 
-    by hadoop.security.group.mapping.providers property.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.provider.ad4usersX</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.LdapGroupsMapping</value>
-  <description>
-    Class for group mapping provider named by 'ad4usersX'. The name can then be referenced 
-    by hadoop.security.group.mapping.providers property.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.provider.ad4usersY</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.LdapGroupsMapping</value>
-  <description>
-    Class for group mapping provider named by 'ad4usersY'. The name can then be referenced 
-    by hadoop.security.group.mapping.providers property.
-  </description>
-</property>
-
-<property>
-<name>hadoop.security.group.mapping.provider.ad4usersX.ldap.url</name>
-<value>ldap://ad-host-for-users-X:389</value>
-  <description>
-    ldap url for the provider named by 'ad4usersX'. Note this property comes from 
-    'hadoop.security.group.mapping.ldap.url'.
-  </description>
-</property>
-
-<property>
-<name>hadoop.security.group.mapping.provider.ad4usersY.ldap.url</name>
-<value>ldap://ad-host-for-users-Y:389</value>
-  <description>
-    ldap url for the provider named by 'ad4usersY'. Note this property comes from 
-    'hadoop.security.group.mapping.ldap.url'.
-  </description>
-</property>
-
-You also need to configure other properties like
-  hadoop.security.group.mapping.ldap.bind.password.file and etc.
-for ldap providers in the same way as above does.
-
--->
- 
-<property>
-  <name>hadoop.security.groups.cache.secs</name>
-  <value>300</value>
-  <description>
-    This is the config controlling the validity of the entries in the cache
-    containing the user->group mapping. When this duration has expired,
-    then the implementation of the group mapping provider is invoked to get
-    the groups of the user and then cached back.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.negative-cache.secs</name>
-  <value>30</value>
-  <description>
-    Expiration time for entries in the the negative user-to-group mapping
-    caching, in seconds. This is useful when invalid users are retrying
-    frequently. It is suggested to set a small value for this expiration, since
-    a transient error in group lookup could temporarily lock out a legitimate
-    user.
-
-    Set this to zero or negative value to disable negative user-to-group caching.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.groups.cache.warn.after.ms</name>
-  <value>5000</value>
-  <description>
-    If looking up a single user to group takes longer than this amount of
-    milliseconds, we will log a warning message.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.url</name>
-  <value></value>
-  <description>
-    The URL of the LDAP server to use for resolving user groups when using
-    the LdapGroupsMapping user to group mapping.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl</name>
-  <value>false</value>
-  <description>
-    Whether or not to use SSL when connecting to the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore</name>
-  <value></value>
-  <description>
-    File path to the SSL keystore that contains the SSL certificate required
-    by the LDAP server.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.ssl.keystore.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the LDAP SSL keystore.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.user</name>
-  <value></value>
-  <description>
-    The distinguished name of the user to bind as when connecting to the LDAP
-    server. This may be left blank if the LDAP server supports anonymous binds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.bind.password.file</name>
-  <value></value>
-  <description>
-    The path to a file containing the password of the bind user.
-
-    IMPORTANT: This file should be readable only by the Unix user running
-    the daemons.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.base</name>
-  <value></value>
-  <description>
-    The search base for the LDAP connection. This is a distinguished name,
-    and will typically be the root of the LDAP directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.user</name>
-  <value>(&amp;(objectClass=user)(sAMAccountName={0}))</value>
-  <description>
-    An additional filter to use when searching for LDAP users. The default will
-    usually be appropriate for Active Directory installations. If connecting to
-    an LDAP server with a non-AD schema, this should be replaced with
-    (&amp;(objectClass=inetOrgPerson)(uid={0}). {0} is a special string used to
-    denote where the username fits into the filter.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.filter.group</name>
-  <value>(objectClass=group)</value>
-  <description>
-    An additional filter to use when searching for LDAP groups. This should be
-    changed when resolving groups against a non-Active Directory installation.
-    posixGroups are currently not a supported group class.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.member</name>
-  <value>member</value>
-  <description>
-    The attribute of the group object that identifies the users that are
-    members of the group. The default will usually be appropriate for
-    any LDAP installation.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.search.attr.group.name</name>
-  <value>cn</value>
-  <description>
-    The attribute of the group object that identifies the group name. The
-    default will usually be appropriate for all LDAP systems.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.group.mapping.ldap.directory.search.timeout</name>
-  <value>10000</value>
-  <description>
-    The attribute applied to the LDAP SearchControl properties to set a
-    maximum time limit when searching and awaiting a result.
-    Set to 0 if infinite wait period is desired.
-    Default is 10 seconds. Units in milliseconds.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.service.user.name.key</name>
-  <value></value>
-  <description>
-    For those cases where the same RPC protocol is implemented by multiple
-    servers, this configuration is required for specifying the principal
-    name to use for the service when the client wishes to make an RPC call.
-  </description>
-</property>
-
-
-<property>
-    <name>hadoop.security.uid.cache.secs</name>
-    <value>14400</value>
-    <description>
-        This is the config controlling the validity of the entries in the cache
-        containing the userId to userName and groupId to groupName used by
-        NativeIO getFstat().
-    </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.protection</name>
-  <value>authentication</value>
-  <description>A comma-separated list of protection values for secured sasl 
-      connections. Possible values are authentication, integrity and privacy.
-      authentication means authentication only and no integrity or privacy; 
-      integrity implies authentication and integrity are enabled; and privacy 
-      implies all of authentication, integrity and privacy are enabled.
-      hadoop.security.saslproperties.resolver.class can be used to override
-      the hadoop.rpc.protection for a connection at the server side.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.saslproperties.resolver.class</name>
-  <value></value>
-  <description>SaslPropertiesResolver used to resolve the QOP used for a 
-      connection. If not specified, the full set of values specified in 
-      hadoop.rpc.protection is used while determining the QOP used for the 
-      connection. If a class is specified, then the QOP values returned by 
-      the class will be used while determining the QOP used for the connection.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.work.around.non.threadsafe.getpwuid</name>
-  <value>false</value>
-  <description>Some operating systems or authentication modules are known to
-  have broken implementations of getpwuid_r and getpwgid_r, such that these
-  calls are not thread-safe. Symptoms of this problem include JVM crashes
-  with a stack trace inside these functions. If your system exhibits this
-  issue, enable this configuration parameter to include a lock around the
-  calls as a workaround.
-
-  An incomplete list of some systems known to have this issue is available
-  at http://wiki.apache.org/hadoop/KnownBrokenPwuidImplementations
-  </description>
-</property>
-
-<property>
-  <name>hadoop.kerberos.kinit.command</name>
-  <value>kinit</value>
-  <description>Used to periodically renew Kerberos credentials when provided
-  to Hadoop. The default setting assumes that kinit is in the PATH of users
-  running the Hadoop client. Change this to the absolute path to kinit if this
-  is not the case.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.auth_to_local</name>
-  <value></value>
-  <description>Maps kerberos principals to local user names</description>
-</property>
-
-<!-- i/o properties -->
-<property>
-  <name>io.file.buffer.size</name>
-  <value>4096</value>
-  <description>The size of buffer for use in sequence files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-  
-<property>
-  <name>io.bytes.per.checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  io.file.buffer.size.</description>
-</property>
-
-<property>
-  <name>io.skip.checksum.errors</name>
-  <value>false</value>
-  <description>If true, when a checksum error is encountered while
-  reading a sequence file, entries are skipped, instead of throwing an
-  exception.</description>
-</property>
-
-<property>
-  <name>io.compression.codecs</name>
-  <value></value>
-  <description>A comma-separated list of the compression codec classes that can
-  be used for compression/decompression. In addition to any classes specified
-  with this property (which take precedence), codec classes on the classpath
-  are discovered using a Java ServiceLoader.</description>
-</property>
-
-<property>
-  <name>io.compression.codec.bzip2.library</name>
-  <value>system-native</value>
-  <description>The native-code library to be used for compression and
-  decompression by the bzip2 codec.  This library could be specified
-  either by by name or the full pathname.  In the former case, the
-  library is located by the dynamic linker, usually searching the
-  directories specified in the environment variable LD_LIBRARY_PATH.
-  
-  The value of "system-native" indicates that the default system
-  library should be used.  To indicate that the algorithm should
-  operate entirely in Java, specify "java-builtin".</description>
-</property>
-
-<property>
-  <name>io.serializations</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.serializer.WritableSerialization,org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.serializer.avro.AvroSpecificSerialization,org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.io.serializer.avro.AvroReflectSerialization</value>
-  <description>A list of serialization classes that can be used for
-  obtaining serializers and deserializers.</description>
-</property>
-
-<property>
-  <name>io.seqfile.local.dir</name>
-  <value>${hadoop.tmp.dir}/io/local</value>
-  <description>The local directory where sequence file stores intermediate
-  data files during merge.  May be a comma-separated list of
-  directories on different devices in order to spread disk i/o.
-  Directories that do not exist are ignored.
-  </description>
-</property>
-
-<property>
-  <name>io.map.index.skip</name>
-  <value>0</value>
-  <description>Number of index entries to skip between each entry.
-  Zero by default. Setting this to values larger than zero can
-  facilitate opening large MapFiles using less memory.</description>
-</property>
-
-<property>
-  <name>io.map.index.interval</name>
-  <value>128</value>
-  <description>
-    MapFile consist of two files - data file (tuples) and index file
-    (keys). For every io.map.index.interval records written in the
-    data file, an entry (record-key, data-file-position) is written
-    in the index file. This is to allow for doing binary search later
-    within the index file to look up records by their keys and get their
-    closest positions in the data file.
-  </description>
-</property>
-
-<!-- file system properties -->
-
-<property>
-  <name>fs.defaultFS</name>
-  <value>file:///</value>
-  <description>The name of the default file system.  A URI whose
-  scheme and authority determine the FileSystem implementation.  The
-  uri's scheme determines the config property (fs.SCHEME.impl) naming
-  the FileSystem implementation class.  The uri's authority is used to
-  determine the host, port, etc. for a filesystem.</description>
-</property>
-
-<property>
-  <name>fs.default.name</name>
-  <value>file:///</value>
-  <description>Deprecated. Use (fs.defaultFS) property
-  instead</description>
-</property>
-
-<property>
-  <name>fs.trash.interval</name>
-  <value>0</value>
-  <description>Number of minutes after which the checkpoint
-  gets deleted.  If zero, the trash feature is disabled.
-  This option may be configured both on the server and the
-  client. If trash is disabled server side then the client
-  side configuration is checked. If trash is enabled on the
-  server side then the value configured on the server is
-  used and the client configuration value is ignored.
-  </description>
-</property>
-
-<property>
-  <name>fs.trash.checkpoint.interval</name>
-  <value>0</value>
-  <description>Number of minutes between trash checkpoints.
-  Should be smaller or equal to fs.trash.interval. If zero,
-  the value is set to the value of fs.trash.interval.
-  Every time the checkpointer runs it creates a new checkpoint 
-  out of current and removes checkpoints created more than 
-  fs.trash.interval minutes ago.
-  </description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.file.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.local.LocalFs</value>
-  <description>The AbstractFileSystem for file: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.har.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.HarFs</value>
-  <description>The AbstractFileSystem for har: uris.</description>
-</property> 
-
-<property>
-  <name>fs.AbstractFileSystem.hdfs.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.Hdfs</value>
-  <description>The FileSystem for hdfs: uris.</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.viewfs.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.viewfs.ViewFs</value>
-  <description>The AbstractFileSystem for view file system for viewfs: uris
-  (ie client side mount table:).</description>
-</property>
-
-<property>
-  <name>fs.AbstractFileSystem.ftp.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.ftp.FtpFs</value>
-  <description>The FileSystem for Ftp: uris.</description>
-</property>
-
-<property>
-  <name>fs.ftp.host</name>
-  <value>0.0.0.0</value>
-  <description>FTP filesystem connects to this server</description>
-</property>
-
-<property>
-  <name>fs.ftp.host.port</name>
-  <value>21</value>
-  <description>
-    FTP filesystem connects to fs.ftp.host on this port
-  </description>
-</property>
-
-<property>
-  <name>fs.df.interval</name>
-  <value>60000</value>
-  <description>Disk usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.du.interval</name>
-  <value>600000</value>
-  <description>File space usage statistics refresh interval in msec.</description>
-</property>
-
-<property>
-  <name>fs.s3.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when writing files to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3</value>
-  <description>Determines where on the local filesystem the S3 filesystem
-  should store files before sending them to S3
-  (or after retrieving them from S3).
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.maxRetries</name>
-  <value>4</value>
-  <description>The maximum number of retries for reading or writing files to S3, 
-  before we signal failure to the application.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3.sleepTimeSeconds</name>
-  <value>10</value>
-  <description>The number of seconds to sleep between each S3 retry.
-  </description>
-</property>
-
-<property>
-  <name>fs.swift.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.swift.snative.SwiftNativeFileSystem</value>
-  <description>The implementation class of the OpenStack Swift Filesystem</description>
-</property>
-
-<property>
-  <name>fs.automatic.close</name>
-  <value>true</value>
-  <description>By default, FileSystem instances are automatically closed at program
-  exit using a JVM shutdown hook. Setting this property to false disables this
-  behavior. This is an advanced option that should only be used by server applications
-  requiring a more carefully orchestrated shutdown sequence.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.block.size</name>
-  <value>67108864</value>
-  <description>Block size to use when reading files using the native S3
-  filesystem (s3n: URIs).</description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.enabled</name>
-  <value>false</value>
-  <description>Setting this property to true enables multiple uploads to
-  native S3 filesystem. When uploading a file, it is split into blocks
-  if the size is larger than fs.s3n.multipart.uploads.block.size.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.uploads.block.size</name>
-  <value>67108864</value>
-  <description>The block size for multipart uploads to native S3 filesystem.
-  Default size is 64MB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.multipart.copy.block.size</name>
-  <value>5368709120</value>
-  <description>The block size for multipart copy in native S3 filesystem.
-  Default size is 5GB.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3n.server-side-encryption-algorithm</name>
-  <value></value>
-  <description>Specify a server-side encryption algorithm for S3.
-  The default is NULL, and the only other currently allowable value is AES256.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.awsAccessKeyId</name>
-  <description>AWS access key ID. Omit for Role-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.awsSecretAccessKey</name>
-  <description>AWS secret key. Omit for Role-based authentication.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.maximum</name>
-  <value>15</value>
-  <description>Controls the maximum number of simultaneous connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.ssl.enabled</name>
-  <value>true</value>
-  <description>Enables or disables SSL connections to S3.</description>
-</property>
-
-<property>
-  <name>fs.s3a.endpoint</name>
-  <description>AWS S3 endpoint to connect to. An up-to-date list is
-    provided in the AWS Documentation: regions and endpoints. Without this
-    property, the standard region (s3.amazonaws.com) is assumed.
-  </description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.host</name>
-  <description>Hostname of the (optional) proxy server for S3 connections.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.port</name>
-  <description>Proxy server port. If this property is not set
-    but fs.s3a.proxy.host is, port 80 or 443 is assumed (consistent with
-    the value of fs.s3a.connection.ssl.enabled).</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.username</name>
-  <description>Username for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.password</name>
-  <description>Password for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.domain</name>
-  <description>Domain for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.proxy.workstation</name>
-  <description>Workstation for authenticating with proxy server.</description>
-</property>
-
-<property>
-  <name>fs.s3a.attempts.maximum</name>
-  <value>10</value>
-  <description>How many times we should retry commands on transient errors.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.establish.timeout</name>
-  <value>5000</value>
-  <description>Socket connection setup timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.connection.timeout</name>
-  <value>50000</value>
-  <description>Socket connection timeout in milliseconds.</description>
-</property>
-
-<property>
-  <name>fs.s3a.paging.maximum</name>
-  <value>5000</value>
-  <description>How many keys to request from S3 when doing 
-     directory listings at a time.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.max</name>
-  <value>256</value>
-  <description> Maximum number of concurrent active (part)uploads,
-    which each use a thread from the threadpool.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.core</name>
-  <value>15</value>
-  <description>Number of core threads in the threadpool.</description>
-</property>
-
-<property>
-  <name>fs.s3a.threads.keepalivetime</name>
-  <value>60</value>
-  <description>Number of seconds a thread can be idle before being
-    terminated.</description>
-</property>
-
-<property>
-  <name>fs.s3a.max.total.tasks</name>
-  <value>1000</value>
-  <description>Number of (part)uploads allowed to the queue before
-    blocking additional uploads.</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.size</name>
-  <value>104857600</value>
-  <description>How big (in bytes) to split upload or copy operations up into.</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.threshold</name>
-  <value>2147483647</value>
-  <description>Threshold before uploads or copies use parallel multipart operations.</description>
-</property>
-
-<property>
-  <name>fs.s3a.acl.default</name>
-  <description>Set a canned ACL for newly created and copied objects. Value may be private, 
-     public-read, public-read-write, authenticated-read, log-delivery-write, 
-     bucket-owner-read, or bucket-owner-full-control.</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge</name>
-  <value>false</value>
-  <description>True if you want to purge existing multipart uploads that may not have been
-     completed/aborted correctly</description>
-</property>
-
-<property>
-  <name>fs.s3a.multipart.purge.age</name>
-  <value>86400</value>
-  <description>Minimum age in seconds of multipart uploads to purge</description>
-</property>
-
-<property>
-  <name>fs.s3a.buffer.dir</name>
-  <value>${hadoop.tmp.dir}/s3a</value>
-  <description>Comma separated list of directories that will be used to buffer file 
-    uploads to.</description>
-</property>
-
-<property>
-  <name>fs.s3a.fast.upload</name>
-  <value>false</value>
-  <description>Upload directly from memory instead of buffering to
-    disk first. Memory usage and parallelism can be controlled as up to
-    fs.s3a.multipart.size memory is consumed for each (part)upload actively
-    uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
-</property>
-
-  <property>
-  <name>fs.s3a.fast.buffer.size</name>
-  <value>1048576</value>
-  <description>Size of initial memory buffer in bytes allocated for an
-    upload. No effect if fs.s3a.fast.upload is false.</description>
-</property>
-
-<property>
-  <name>fs.s3a.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.fs.s3a.S3AFileSystem</value>
-  <description>The implementation class of the S3A Filesystem</description>
-</property>
-
-<property>
-  <name>io.seqfile.compress.blocksize</name>
-  <value>1000000</value>
-  <description>The minimum block size for compression in block compressed 
-          SequenceFiles.
-  </description>
-</property>
-
-<property>
-  <name>io.seqfile.lazydecompress</name>
-  <value>true</value>
-  <description>Should values of block-compressed SequenceFiles be decompressed
-          only when necessary.
-  </description>
-</property>
-
-<property>
-  <name>io.seqfile.sorter.recordlimit</name>
-  <value>1000000</value>
-  <description>The limit on number of records to be kept in memory in a spill 
-          in SequenceFiles.Sorter
-  </description>
-</property>
-
- <property>
-  <name>io.mapfile.bloom.size</name>
-  <value>1048576</value>
-  <description>The size of BloomFilter-s used in BloomMapFile. Each time this many
-  keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
-  Larger values minimize the number of filters, which slightly increases the performance,
-  but may waste too much space if the total number of keys is usually much smaller
-  than this number.
-  </description>
-</property>
-
-<property>
-  <name>io.mapfile.bloom.error.rate</name>
-  <value>0.005</value>
-  <description>The rate of false positives in BloomFilter-s used in BloomMapFile.
-  As this value decreases, the size of BloomFilter-s increases exponentially. This
-  value is the probability of encountering false positives (default is 0.5%).
-  </description>
-</property>
-
-<property>
-  <name>hadoop.util.hash.type</name>
-  <value>murmur</value>
-  <description>The default implementation of Hash. Currently this can take one of the
-  two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
-  </description>
-</property>
-
-
-<!-- ipc properties -->
-
-<property>
-  <name>ipc.client.idlethreshold</name>
-  <value>4000</value>
-  <description>Defines the threshold number of connections after which
-               connections will be inspected for idleness.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.kill.max</name>
-  <value>10</value>
-  <description>Defines the maximum number of clients to disconnect in one go.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connection.maxidletime</name>
-  <value>10000</value>
-  <description>The maximum time in msec after which a client will bring down the
-               connection to the server.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries</name>
-  <value>10</value>
-  <description>Indicates the number of retries a client will make to establish
-               a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.retry.interval</name>
-  <value>1000</value>
-  <description>Indicates the number of milliseconds a client will wait for
-    before retrying to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.timeout</name>
-  <value>20000</value>
-  <description>Indicates the number of milliseconds a client will wait for the 
-               socket to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.connect.max.retries.on.timeouts</name>
-  <value>45</value>
-  <description>Indicates the number of retries a client will make on socket timeout
-               to establish a server connection.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.ping</name>
-  <value>true</value>
-  <description>Send a ping to the server when timeout on reading the response,
-  if set to true. If no failure is detected, the client retries until at least
-  a byte is read.
-  </description>
-</property>
-
-<property>
-  <name>ipc.ping.interval</name>
-  <value>60000</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  The client will send ping when the interval is passed without receiving bytes,
-  if ipc.client.ping is set to true.
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.rpc-timeout.ms</name>
-  <value>0</value>
-  <description>Timeout on waiting response from server, in milliseconds.
-  Currently this timeout works only when ipc.client.ping is set to true
-  because it uses the same facilities with IPC ping.
-  The timeout overrides the ipc.ping.interval and client will throw exception
-  instead of sending ping when the interval is passed.
-  </description>
-</property>
-
-<property>
-  <name>ipc.server.listen.queue.size</name>
-  <value>128</value>
-  <description>Indicates the length of the listen queue for servers accepting
-               client connections.
-  </description>
-</property>
-
-<property>
-  <name>ipc.maximum.data.length</name>
-  <value>67108864</value>
-  <description>This indicates the maximum IPC message length (bytes) that can be
-    accepted by the server. Messages larger than this value are rejected by
-    server immediately. This setting should rarely need to be changed. It merits
-    investigating whether the cause of long RPC messages can be fixed instead,
-    e.g. by splitting into smaller messages.
-  </description>
-</property>
-
-<!-- Proxy Configuration -->
-
-<property>
-  <name>hadoop.security.impersonation.provider.class</name>
-  <value></value>
-  <description>A class which implements ImpersonationProvider interface, used to 
-       authorize whether one user can impersonate a specific user. 
-       If not specified, the DefaultImpersonationProvider will be used. 
-       If a class is specified, then that class will be used to determine 
-       the impersonation capability.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.default</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.net.StandardSocketFactory</value>
-  <description> Default SocketFactory to use. This parameter is expected to be
-    formatted as "package.FactoryClassName".
-  </description>
-</property>
-
-<property>
-  <name>hadoop.rpc.socket.factory.class.ClientProtocol</name>
-  <value></value>
-  <description> SocketFactory to use to connect to a DFS. If null or empty, use
-    hadoop.rpc.socket.class.default. This socket factory is also used by
-    DFSClient to create sockets to DataNodes.
-  </description>
-</property>
-
-
-
-<property>
-  <name>hadoop.socks.server</name>
-  <value></value>
-  <description> Address (host:port) of the SOCKS server to be used by the
-    SocksSocketFactory.
-  </description>
-</property>
-
-<!-- Topology Configuration -->
-<property>
-  <name>net.topology.node.switch.mapping.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.net.ScriptBasedMapping</value>
-  <description> The default implementation of the DNSToSwitchMapping. It
-    invokes a script specified in net.topology.script.file.name to resolve
-    node names. If the value for net.topology.script.file.name is not set, the
-    default value of DEFAULT_RACK is returned for all node names.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.impl</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.net.NetworkTopology</value>
-  <description> The default implementation of NetworkTopology which is classic three layer one.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.file.name</name>
-  <value></value>
-  <description> The script name that should be invoked to resolve DNS names to
-    NetworkTopology names. Example: the script would take host.foo.bar as an
-    argument, and return /rack1 as the output.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.script.number.args</name>
-  <value>100</value>
-  <description> The max number of args that the script configured with 
-    net.topology.script.file.name should be run with. Each arg is an
-    IP address.
-  </description>
-</property>
-
-<property>
-  <name>net.topology.table.file.name</name>
-  <value></value>
-  <description> The file name for a topology file, which is used when the
-    net.topology.node.switch.mapping.impl property is set to
-    org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.net.TableMapping. The file format is a two column text
-    file, with columns separated by whitespace. The first column is a DNS or
-    IP address and the second column specifies the rack where the address maps.
-    If no entry corresponding to a host in the cluster is found, then 
-    /default-rack is assumed.
-  </description>
-</property>
-
-<!-- Local file system -->
-<property>
-  <name>file.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>file.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  file.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>file.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>file.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>file.replication</name>
-  <value>1</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3 File System -->
-
-<property>
-  <name>s3.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- s3native File System -->
-
-<property>
-  <name>s3native.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>s3native.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  s3native.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>s3native.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>s3native.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>s3native.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- FTP file system -->
-<property>
-  <name>ftp.stream-buffer-size</name>
-  <value>4096</value>
-  <description>The size of buffer to stream files.
-  The size of this buffer should probably be a multiple of hardware
-  page size (4096 on Intel x86), and it determines how much data is
-  buffered during read and write operations.</description>
-</property>
-
-<property>
-  <name>ftp.bytes-per-checksum</name>
-  <value>512</value>
-  <description>The number of bytes per checksum.  Must not be larger than
-  ftp.stream-buffer-size</description>
-</property>
-
-<property>
-  <name>ftp.client-write-packet-size</name>
-  <value>65536</value>
-  <description>Packet size for clients to write</description>
-</property>
-
-<property>
-  <name>ftp.blocksize</name>
-  <value>67108864</value>
-  <description>Block size</description>
-</property>
-
-<property>
-  <name>ftp.replication</name>
-  <value>3</value>
-  <description>Replication factor</description>
-</property>
-
-<!-- Tfile -->
-
-<property>
-  <name>tfile.io.chunk.size</name>
-  <value>1048576</value>
-  <description>
-    Value chunk size in bytes. Default  to
-    1MB. Values of the length less than the chunk size is
-    guaranteed to have known value length in read time (See also
-    TFile.Reader.Scanner.Entry.isValueLengthKnown()).
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.output.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataOutputStream in bytes.
-  </description>
-</property>
-
-<property>
-  <name>tfile.fs.input.buffer.size</name>
-  <value>262144</value>
-  <description>
-    Buffer size used for FSDataInputStream in bytes.
-  </description>
-</property>
-
-<!-- HTTP web-consoles Authentication -->
-
-<property>
-  <name>hadoop.http.authentication.type</name>
-  <value>simple</value>
-  <description>
-    Defines authentication used for Oozie HTTP endpoint.
-    Supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.token.validity</name>
-  <value>36000</value>
-  <description>
-    Indicates how long (in seconds) an authentication token is valid before it has
-    to be renewed.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.signature.secret.file</name>
-  <value>${user.home}/hadoop-http-auth-signature-secret</value>
-  <description>
-    The signature secret for signing the authentication tokens.
-    The same secret should be used for JT/NN/DN/TT configurations.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.cookie.domain</name>
-  <value></value>
-  <description>
-    The domain to use for the HTTP cookie that stores the authentication token.
-    In order to authentiation to work correctly across all Hadoop nodes web-consoles
-    the domain must be correctly set.
-    IMPORTANT: when using IP addresses, browsers ignore cookies with domain settings.
-    For this setting to work properly all nodes in the cluster must be configured
-    to generate URLs with hostname.domain names on it.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.simple.anonymous.allowed</name>
-  <value>true</value>
-  <description>
-    Indicates if anonymous requests are allowed when using 'simple' authentication.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.principal</name>
-  <value>HTTP/_HOST@LOCALHOST</value>
-  <description>
-    Indicates the Kerberos principal to be used for HTTP endpoint.
-    The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.http.authentication.kerberos.keytab</name>
-  <value>${user.home}/hadoop.keytab</value>
-  <description>
-    Location of the keytab file with the credentials for the principal.
-    Referring to the same keytab file Oozie uses for its Kerberos credentials for Hadoop.
-  </description>
-</property>
-
-<!-- HTTP CORS support -->
-<property>
-  <description>Enable/disable the cross-origin (CORS) filter.</description>
-  <name>hadoop.http.cross-origin.enabled</name>
-  <value>false</value>
-</property>
-
-<property>
-  <description>Comma separated list of origins that are allowed for web
-    services needing cross-origin (CORS) support. Wildcards (*) and patterns
-    allowed</description>
-  <name>hadoop.http.cross-origin.allowed-origins</name>
-  <value>*</value>
-</property>
-
-<property>
-  <description>Comma separated list of methods that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-methods</name>
-  <value>GET,POST,HEAD</value>
-</property>
-
-<property>
-  <description>Comma separated list of headers that are allowed for web
-    services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.allowed-headers</name>
-  <value>X-Requested-With,Content-Type,Accept,Origin</value>
-</property>
-
-<property>
-  <description>The number of seconds a pre-flighted request can be cached
-    for web services needing cross-origin (CORS) support.</description>
-  <name>hadoop.http.cross-origin.max-age</name>
-  <value>1800</value>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.methods</name>
-  <value></value>
-  <description>
-    List of fencing methods to use for service fencing. May contain
-    builtin methods (eg shell and sshfence) or user-defined method.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.connect-timeout</name>
-  <value>30000</value>
-  <description>
-    SSH connection timeout, in milliseconds, to use with the builtin
-    sshfence fencer.
-  </description>
-</property>
-
-<property>
-  <name>dfs.ha.fencing.ssh.private-key-files</name>
-  <value></value>
-  <description>
-    The SSH private key files to use with the builtin sshfence fencer.
-  </description>
-</property>
-
-
-<!-- Static Web User Filter properties. -->
-<property>
-  <description>
-    The user name to filter as, on static web filters
-    while rendering content. An example use is the HDFS
-    web UI (user to be used for browsing files).
-  </description>
-  <name>hadoop.http.staticuser.user</name>
-  <value>dr.who</value>
-</property>
-
-<property>
-  <name>ha.zookeeper.quorum</name>
-  <description>
-    A list of ZooKeeper server addresses, separated by commas, that are
-    to be used by the ZKFailoverController in automatic failover.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.session-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    The session timeout to use when the ZKFC connects to ZooKeeper.
-    Setting this value to a lower value implies that server crashes
-    will be detected more quickly, but risks triggering failover too
-    aggressively in the case of a transient error or network blip.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.parent-znode</name>
-  <value>/hadoop-ha</value>
-  <description>
-    The ZooKeeper znode under which the ZK failover controller stores
-    its information. Note that the nameservice ID is automatically
-    appended to this znode, so it is not normally necessary to
-    configure this, even in a federated environment.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.acl</name>
-  <value>world:anyone:rwcda</value>
-  <description>
-    A comma-separated list of ZooKeeper ACLs to apply to the znodes
-    used by automatic failover. These ACLs are specified in the same
-    format as used by the ZooKeeper CLI.
-
-    If the ACL itself contains secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<property>
-  <name>ha.zookeeper.auth</name>
-  <value></value>
-  <description>
-    A comma-separated list of ZooKeeper authentications to add when
-    connecting to ZooKeeper. These are specified in the same format
-    as used by the &quot;addauth&quot; command in the ZK CLI. It is
-    important that the authentications specified here are sufficient
-    to access znodes with the ACL specified in ha.zookeeper.acl.
-
-    If the auths contain secrets, you may instead specify a
-    path to a file, prefixed with the '@' symbol, and the value of
-    this configuration will be loaded from within.
-  </description>
-</property>
-
-<!-- SSLFactory configuration -->
-
-<property>
-  <name>hadoop.ssl.keystores.factory.class</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
-  <description>
-    The keystores factory to use for retrieving certificates.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.require.client.cert</name>
-  <value>false</value>
-  <description>Whether client certificates are required</description>
-</property>
-
-<property>
-  <name>hadoop.ssl.hostname.verifier</name>
-  <value>DEFAULT</value>
-  <description>
-    The hostname verifier to provide for HttpsURLConnections.
-    Valid values are: DEFAULT, STRICT, STRICT_I6, DEFAULT_AND_LOCALHOST and
-    ALLOW_ALL
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.server.conf</name>
-  <value>ssl-server.xml</value>
-  <description>
-    Resource file from which ssl server keystore information will be extracted.
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.client.conf</name>
-  <value>ssl-client.xml</value>
-  <description>
-    Resource file from which ssl client keystore information will be extracted
-    This file is looked up in the classpath, typically it should be in Hadoop
-    conf/ directory.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled</name>
-  <value>false</value>
-  <description>
-    Deprecated. Use dfs.http.policy and yarn.http.policy instead.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.ssl.enabled.protocols</name>
-  <value>TLSv1</value>
-  <description>
-    Protocols supported by the ssl.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.jetty.logs.serve.aliases</name>
-  <value>true</value>
-  <description>
-    Enable/Disable aliases serving from jetty
-  </description>
-</property>
-
-<property>
-  <name>fs.permissions.umask-mode</name>
-  <value>022</value>
-  <description>
-    The umask used when creating files and directories.
-    Can be in octal or in symbolic. Examples are:
-    "022" (octal for u=rwx,g=r-x,o=r-x in symbolic),
-    or "u=rwx,g=rwx,o=" (symbolic for 007 in octal).
-  </description>
-</property>
-
-<!-- ha properties -->
-
-<property>
-  <name>ha.health-monitor.connect-retry-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to retry connecting to the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.check-interval.ms</name>
-  <value>1000</value>
-  <description>
-    How often to check the service.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.sleep-after-disconnect.ms</name>
-  <value>1000</value>
-  <description>
-    How long to sleep after an unexpected RPC error.
-  </description>
-</property>
-
-<property>
-  <name>ha.health-monitor.rpc-timeout.ms</name>
-  <value>45000</value>
-  <description>
-    Timeout for the actual monitorHealth() calls.
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.new-active.rpc-timeout.ms</name>
-  <value>60000</value>
-  <description>
-    Timeout that the FC waits for the new active to become active
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.rpc-timeout.ms</name>
-  <value>5000</value>
-  <description>
-    Timeout that the FC waits for the old active to go to standby
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.graceful-fence.connection.retries</name>
-  <value>1</value>
-  <description>
-    FC connection retries for graceful fencing
-  </description>
-</property>
-
-<property>
-  <name>ha.failover-controller.cli-check.rpc-timeout.ms</name>
-  <value>20000</value>
-  <description>
-    Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState
-  </description>
-</property>
-
-<property>
-  <name>ipc.client.fallback-to-simple-auth-allowed</name>
-  <value>false</value>
-  <description>
-    When a client is configured to attempt a secure connection, but attempts to
-    connect to an insecure server, that server may instruct the client to
-    switch to SASL SIMPLE (unsecure) authentication. This setting controls
-    whether or not the client will accept this instruction from the server.
-    When false (the default), the client will not allow the fallback to SIMPLE
-    authentication, and will abort the connection.
-  </description>
-</property>
-
-<property>
-  <name>fs.client.resolve.remote.symlinks</name>
-  <value>true</value>
-  <description>
-      Whether to resolve symlinks when accessing a remote Hadoop filesystem.
-      Setting this to false causes an exception to be thrown upon encountering
-      a symlink. This setting does not apply to local filesystems, which
-      automatically resolve local symlinks.
-  </description>
-</property>
-
-<property>
-  <name>nfs.exports.allowed.hosts</name>
-  <value>* rw</value>
-  <description>
-    By default, the export can be mounted by any client. The value string 
-    contains machine name and access privilege, separated by whitespace 
-    characters. The machine name format can be a single host, a Java regular 
-    expression, or an IPv4 address. The access privilege uses rw or ro to 
-    specify read/write or read-only access of the machines to exports. If the 
-    access privilege is not provided, the default is read-only. Entries are separated by ";".
-    For example: "192.168.0.0/22 rw ; host.*\.example\.com ; host1.test.org ro;".
-    Only the NFS gateway needs to restart after this property is updated. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.user.group.static.mapping.overrides</name>
-  <value>dr.who=;</value>
-  <description>
-    Static mapping of user to groups. This will override the groups if
-    available in the system for the specified user. In otherwords, groups
-    look-up will not happen for these users, instead groups mapped in this
-    configuration will be used.
-    Mapping should be in this format.
-    user1=group1,group2;user2=;user3=group2;
-    Default, "dr.who=;" will consider "dr.who" as user without groups.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.quantile.enable</name>
-  <value>false</value>
-  <description>
-    Setting this property to true and rpc.metrics.percentiles.intervals
-    to a comma-separated list of the granularity in seconds, the
-    50/75/90/95/99th percentile latency for rpc queue/processing time in
-    milliseconds are added to rpc metrics.
-  </description>
-</property>
-
-<property>
-  <name>rpc.metrics.percentiles.intervals</name>
-  <value></value>
-  <description>
-    A comma-separated list of the granularity in seconds for the metrics which
-    describe the 50/75/90/95/99th percentile latency for rpc queue/processing
-    time. The metrics are outputted if rpc.metrics.quantile.enable is set to
-    true.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.EXAMPLECIPHERSUITE</name>
-  <value></value>
-  <description>
-    The prefix for a given crypto codec, contains a comma-separated
-    list of implementation classes for a given crypto codec (eg EXAMPLECIPHERSUITE).
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.codec.classes.aes.ctr.nopadding</name>
-  <value>org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec,org.apache.flink.fs.s3presto.shaded.org.apache.hadoop.crypto.JceAesCtrCryptoCodec</value>
-  <description>
-    Comma-separated list of crypto codec implementations for AES/CTR/NoPadding. 
-    The first implementation will be used if available, others are fallbacks.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.cipher.suite</name>
-  <value>AES/CTR/NoPadding</value>
-  <description>
-    Cipher suite for crypto codec.
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.jce.provider</name>
-  <value></value>
-  <description>
-    The JCE provider name used in CryptoCodec. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.crypto.buffer.size</name>
-  <value>8192</value>
-  <description>
-    The buffer size used by CryptoInputStream and CryptoOutputStream. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.java.secure.random.algorithm</name>
-  <value>SHA1PRNG</value>
-  <description>
-    The java secure random algorithm. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.secure.random.impl</name>
-  <value></value>
-  <description>
-    Implementation of secure random. 
-  </description>
-</property>
-
-<property>
-  <name>hadoop.security.random.device.file.path</name>
-  <value>/dev/urandom</value>
-  <description>
-    OS security random device file path.
-  </description>
-</property>
-
-<property>
-  <name>fs.har.impl.disable.cache</name>
-  <value>true</value>
-  <description>Don't cache 'har' filesystem instances.</description>
-</property>
-
-<!--- KMSClientProvider configurations -->
-<property>
-  <name>hadoop.security.kms.client.authentication.retry-count</name>
-  <value>1</value>
-  <description>
-    Number of time to retry connecting to KMS on authentication failure
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.size</name>
-  <value>500</value>
-  <description>
-    Size of the EncryptedKeyVersion cache Queue for each key
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.low-watermark</name>
-  <value>0.3f</value>
-  <description>
-    If size of the EncryptedKeyVersion cache Queue falls below the
-    low watermark, this cache queue will be scheduled for a refill
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.num.refill.threads</name>
-  <value>2</value>
-  <description>
-    Number of threads to use for refilling depleted EncryptedKeyVersion
-    cache Queues
-  </description>
-</property>
-<property>
-  <name>hadoop.security.kms.client.encrypted.key.cache.expiry</name>
-  <value>43200000</value>
-  <description>
-    Cache expiry time for a Key, after which the cache Queue for this
-    key will be dropped. Default = 12hrs
-  </description>
-</property>
-
-<property>
-  <name>hadoop.htrace.spanreceiver.classes</name>
-  <value></value>
-  <description>
-    A comma separated list of the fully-qualified class name of classes 
-    implementing SpanReceiver. The tracing system works by collecting 
-    information in structs called 'Spans'. It is up to you to choose 
-    how you want to receive this information by implementing the 
-    SpanReceiver interface.
-  </description>
-</property>
-
- <property>
-  <name>ipc.server.max.connections</name>
-  <value>0</value>
-  <description>The maximum number of concurrent connections a server is allowed
-    to accept. If this limit is exceeded, incoming connections will first fill
-    the listen queue and then may go to an OS-specific listen overflow queue. 
-    The client may fail or timeout, but the server can avoid running out of file
-    descriptors using this feature. 0 means no limit.
-  </description>
-</property>
-
-
-  <!-- YARN registry -->
-
-  <property>
-    <description>
-      Is the registry enabled in the YARN Resource Manager?
-
-      If true, the YARN RM will, as needed.
-      create the user and system paths, and purge
-      service records when containers, application attempts
-      and applications complete.
-
-      If false, the paths must be created by other means,
-      and no automatic cleanup of service records will take place.
-    </description>
-    <name>hadoop.registry.rm.enabled</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      The root zookeeper node for the registry
-    </description>
-    <name>hadoop.registry.zk.root</name>
-    <value>/registry</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper session timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.session.timeout.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection timeout in milliseconds
-    </description>
-    <name>hadoop.registry.zk.connection.timeout.ms</name>
-    <value>15000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper connection retry count before failing
-    </description>
-    <name>hadoop.registry.zk.retry.times</name>
-    <value>5</value>
-  </property>
-
-  <property>
-    <description>
-    </description>
-    <name>hadoop.registry.zk.retry.interval.ms</name>
-    <value>1000</value>
-  </property>
-
-  <property>
-    <description>
-      Zookeeper retry limit in milliseconds, during
-      exponential backoff.
-
-      This places a limit even
-      if the retry times and interval limit, combined
-      with the backoff policy, result in a long retry
-      period
-    </description>
-    <name>hadoop.registry.zk.retry.ceiling.ms</name>
-    <value>60000</value>
-  </property>
-
-  <property>
-    <description>
-      List of hostname:port pairs defining the
-      zookeeper quorum binding for the registry
-    </description>
-    <name>hadoop.registry.zk.quorum</name>
-    <value>localhost:2181</value>
-  </property>
-
-  <property>
-    <description>
-      Key to set if the registry is secure. Turning it on
-      changes the permissions policy from "open access"
-      to restrictions on kerberos with the option of
-      a user adding one or more auth key pairs down their
-      own tree.
-    </description>
-    <name>hadoop.registry.secure</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <description>
-      A comma separated list of Zookeeper ACL identifiers with
-      system access to the registry in a secure cluster.
-
-      These are given full access to all entries.
-
-      If there is an "@" at the end of a SASL entry it
-      instructs the registry client to append the default kerberos domain.
-    </description>
-    <name>hadoop.registry.system.acls</name>
-    <value>sasl:yarn@, sasl:mapred@, sasl:hdfs@</value>
-  </property>
-
-  <property>
-    <description>
-      The kerberos realm: used to set the realm of
-      system principals which do not declare their realm,
-      and any other accounts that need the value.
-
-      If empty, the default realm of the running process
-      is used.
-
-      If neither are known and the realm is needed, then the registry
-      service/client will fail.
-    </description>
-    <name>hadoop.registry.kerberos.realm</name>
-    <value></value>
-  </property>
-
-  <property>
-    <description>
-      Key to define the JAAS context. Used in secure
-      mode
-    </description>
-    <name>hadoop.registry.jaas.context</name>
-    <value>Client</value>
-  </property>
-
-</configuration>