You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ja...@apache.org on 2014/10/27 21:32:42 UTC

git commit: SQOOP-1620: Sqoop2: FileSystem should be configurable in HDFS connector

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 24b8107ab -> 520fc33ca


SQOOP-1620: Sqoop2: FileSystem should be configurable in HDFS connector

(Abraham Elmahrek via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/520fc33c
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/520fc33c
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/520fc33c

Branch: refs/heads/sqoop2
Commit: 520fc33ca33a5faa446c360efa48c6caa08f043d
Parents: 24b8107
Author: Jarek Jarcec Cecho <ja...@apache.org>
Authored: Mon Oct 27 13:31:51 2014 -0700
Committer: Jarek Jarcec Cecho <ja...@apache.org>
Committed: Mon Oct 27 13:32:22 2014 -0700

----------------------------------------------------------------------
 .../sqoop/connector/hdfs/HdfsConnector.java     |  4 +-
 .../sqoop/connector/hdfs/HdfsExtractor.java     |  8 +--
 .../sqoop/connector/hdfs/HdfsFromDestroyer.java |  6 +-
 .../connector/hdfs/HdfsFromInitializer.java     |  8 +--
 .../apache/sqoop/connector/hdfs/HdfsLoader.java | 15 ++---
 .../sqoop/connector/hdfs/HdfsPartitioner.java   |  9 +--
 .../sqoop/connector/hdfs/HdfsToDestroyer.java   |  6 +-
 .../sqoop/connector/hdfs/HdfsToInitializer.java |  9 ++-
 .../apache/sqoop/connector/hdfs/HdfsUtils.java  | 43 ++++++++++++
 .../hdfs/configuration/LinkConfig.java          | 48 +++++++++++++
 .../hdfs/configuration/LinkConfiguration.java   | 31 +++++++++
 .../resources/hdfs-connector-config.properties  |  7 ++
 .../sqoop/connector/hdfs/TestExtractor.java     |  6 +-
 .../sqoop/connector/hdfs/TestHdfsUtils.java     | 44 ++++++++++++
 .../sqoop/connector/hdfs/TestLinkConfig.java    | 71 ++++++++++++++++++++
 .../apache/sqoop/connector/hdfs/TestLoader.java |  4 +-
 .../sqoop/connector/hdfs/TestPartitioner.java   |  4 +-
 17 files changed, 283 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
index b5f1f77..1640f80 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsConnector.java
@@ -24,8 +24,8 @@ import java.util.ResourceBundle;
 import org.apache.sqoop.common.Direction;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.common.VersionInfo;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
 import org.apache.sqoop.connector.spi.SqoopConnector;
@@ -75,7 +75,7 @@ public class HdfsConnector extends SqoopConnector {
   @SuppressWarnings("rawtypes")
   @Override
   public Class getLinkConfigurationClass() {
-    return EmptyConfiguration.class;
+    return LinkConfiguration.class;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
index 31b0a99..2586f94 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsExtractor.java
@@ -32,8 +32,8 @@ import org.apache.hadoop.util.LineReader;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
 import org.apache.sqoop.job.etl.ExtractorContext;
@@ -42,7 +42,7 @@ import org.apache.sqoop.job.etl.ExtractorContext;
  * Extract from HDFS.
  * Default field delimiter of a record is comma.
  */
-public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> {
+public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> {
 
   public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
 
@@ -51,10 +51,10 @@ public class HdfsExtractor extends Extractor<EmptyConfiguration, FromJobConfigur
   private long rowRead = 0;
 
   @Override
-  public void extract(ExtractorContext context, EmptyConfiguration linkConfig,
+  public void extract(ExtractorContext context, LinkConfiguration linkConfiguration,
       FromJobConfiguration jobConfig, HdfsPartition partition) {
 
-    conf = ((PrefixContext) context.getContext()).getConfiguration();
+    conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
     dataWriter = context.getDataWriter();
 
     try {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
index c7d35f7..6d79db7 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromDestroyer.java
@@ -17,12 +17,12 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 
-public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConfiguration> {
+public class HdfsFromDestroyer extends Destroyer<LinkConfiguration, FromJobConfiguration> {
   /**
    * Callback to clean up after job execution.
    *
@@ -31,7 +31,7 @@ public class HdfsFromDestroyer extends Destroyer<EmptyConfiguration, FromJobConf
    * @param jobConfig FROM job configuration object
    */
   @Override
-  public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+  public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
       FromJobConfiguration jobConfig) {
     // do nothing at this point
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
index 0752510..4c6f566 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsFromInitializer.java
@@ -17,14 +17,14 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.schema.Schema;
 
 
-public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJobConfiguration> {
+public class HdfsFromInitializer extends Initializer<LinkConfiguration, FromJobConfiguration> {
   /**
    * Initialize new submission based on given configuration properties. Any
    * needed temporary values might be saved to context object and they will be
@@ -35,13 +35,13 @@ public class HdfsFromInitializer extends Initializer<EmptyConfiguration, FromJob
    * @param jobConfig FROM job configuration object
    */
   @Override
-  public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+  public void initialize(InitializerContext context, LinkConfiguration linkConfig,
       FromJobConfiguration jobConfig) {
     // do nothing at this point
   }
 
   @Override
-  public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+  public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig,
       FromJobConfiguration jobConfig) {
     return new Schema("HDFS file");
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
index 682349c..6c57cf2 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsLoader.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.connector.hdfs.hdfsWriter.GenericHdfsWriter;
@@ -37,22 +37,21 @@ import org.apache.sqoop.job.etl.Loader;
 import org.apache.sqoop.job.etl.LoaderContext;
 import org.apache.sqoop.utils.ClassUtils;
 
-public class HdfsLoader extends Loader<EmptyConfiguration, ToJobConfiguration> {
+public class HdfsLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
   /**
    * Load data to target.
    *
    * @param context Loader context object
-   * @param linkConfig       Link configuration
-   * @param toJobConfig      Job configuration
+   * @param linkConfiguration Link configuration
+   * @param toJobConfig Job configuration
    * @throws Exception
    */
   @Override
-  public void load(LoaderContext context, EmptyConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception {
+  public void load(LoaderContext context, LinkConfiguration linkConfiguration,
+                   ToJobConfiguration toJobConfig) throws Exception {
 
     DataReader reader = context.getDataReader();
-
-    Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
-
+    Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
     String directoryName = toJobConfig.toJobConfig.outputDirectory;
     String codecname = getCompressionCodecName(toJobConfig);
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
index daa7fe2..181528c 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsPartitioner.java
@@ -40,8 +40,8 @@ import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.sqoop.common.PrefixContext;
 import org.apache.sqoop.common.SqoopException;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
 import org.apache.sqoop.job.etl.PartitionerContext;
@@ -50,7 +50,7 @@ import org.apache.sqoop.job.etl.PartitionerContext;
  * This class derives mostly from CombineFileInputFormat of Hadoop, i.e.
  * org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.
  */
-public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConfiguration> {
+public class HdfsPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
 
   public static final String SPLIT_MINSIZE_PERNODE =
       "mapreduce.input.fileinputformat.split.minsize.per.node";
@@ -68,9 +68,10 @@ public class HdfsPartitioner extends Partitioner<EmptyConfiguration, FromJobConf
 
   @Override
   public List<Partition> getPartitions(PartitionerContext context,
-      EmptyConfiguration emptyConfig, FromJobConfiguration fromJobConfig) {
+                                       LinkConfiguration linkConfiguration,
+                                       FromJobConfiguration fromJobConfig) {
 
-    Configuration conf = ((PrefixContext)context.getContext()).getConfiguration();
+    Configuration conf = HdfsUtils.configureURI(((PrefixContext) context.getContext()).getConfiguration(), linkConfiguration);
 
     try {
       long numInputBytes = getInputSize(conf, fromJobConfig.fromJobConfig.inputDirectory);

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
index 8bfd727..3c85be8 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToDestroyer.java
@@ -17,12 +17,12 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
 import org.apache.sqoop.job.etl.DestroyerContext;
 
-public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfiguration> {
+public class HdfsToDestroyer extends Destroyer<LinkConfiguration, ToJobConfiguration> {
   /**
    * Callback to clean up after job execution.
    *
@@ -31,7 +31,7 @@ public class HdfsToDestroyer extends Destroyer<EmptyConfiguration, ToJobConfigur
    * @param jobConfig TO job configuration object
    */
   @Override
-  public void destroy(DestroyerContext context, EmptyConfiguration linkConfig,
+  public void destroy(DestroyerContext context, LinkConfiguration linkConfig,
       ToJobConfiguration jobConfig) {
     // do nothing at this point
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
index e3d54b8..bce72b5 100644
--- a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsToInitializer.java
@@ -17,14 +17,13 @@
  */
 package org.apache.sqoop.connector.hdfs;
 
-import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
 import org.apache.sqoop.job.etl.InitializerContext;
 import org.apache.sqoop.schema.Schema;
 
-
-public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConfiguration> {
+public class HdfsToInitializer extends Initializer<LinkConfiguration, ToJobConfiguration> {
   /**
    * Initialize new submission based on given configuration properties. Any
    * needed temporary values might be saved to context object and they will be
@@ -35,13 +34,13 @@ public class HdfsToInitializer extends Initializer<EmptyConfiguration, ToJobConf
    * @param jobConfig TO job configuration object
    */
   @Override
-  public void initialize(InitializerContext context, EmptyConfiguration linkConfig,
+  public void initialize(InitializerContext context, LinkConfiguration linkConfig,
       ToJobConfiguration jobConfig) {
     // do nothing at this point
   }
 
   @Override
-  public Schema getSchema(InitializerContext context, EmptyConfiguration linkConfig,
+  public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig,
       ToJobConfiguration jobConfig) {
     return new Schema("HDFS file");
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
new file mode 100644
index 0000000..352ee17
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/HdfsUtils.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+
+/**
+ * Utilities for HDFS.
+ */
+public class HdfsUtils {
+
+  /**
+   * Configures the URI to connect to.
+   * @param conf Configuration object to be configured.
+   * @param linkConfiguration LinkConfiguration object that
+   *                          provides configuration.
+   * @return Configuration object.
+   */
+  public static Configuration configureURI(Configuration conf, LinkConfiguration linkConfiguration) {
+    if (linkConfiguration.linkConfig.uri != null) {
+      conf.set("fs.default.name", linkConfiguration.linkConfig.uri);
+      conf.set("fs.defaultFS", linkConfiguration.linkConfig.uri);
+    }
+
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
new file mode 100644
index 0000000..c63f8a8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfig.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.ConfigClass;
+import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
+public class LinkConfig {
+  @Input(size = 255) public String uri;
+
+  public static class ConfigValidator extends AbstractValidator<LinkConfig> {
+    private static final Pattern URI_PATTERN = Pattern.compile("((?<=\\()[A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+(?=\\)))|([A-Za-z][A-Za-z0-9\\+\\.\\-]*:([A-Za-z0-9\\.\\-_~:/\\?#\\[\\]@!\\$&'\\(\\)\\*\\+,;=]|%[A-Fa-f0-9]{2})+)");
+
+    @Override
+    public void validate(LinkConfig config) {
+      if (config.uri != null) {
+        Matcher matcher = URI_PATTERN.matcher(config.uri);
+        if (!matcher.matches()) {
+          addMessage(Status.UNACCEPTABLE,
+              "Invalid URI" + config.uri + ". URI must either be null or a valid URI. Here are a few valid example URIs:"
+              + " hdfs://example.com:8020/, hdfs://example.com/, file:///, file:///tmp, file://localhost/tmp");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
new file mode 100644
index 0000000..29063a8
--- /dev/null
+++ b/connector/connector-hdfs/src/main/java/org/apache/sqoop/connector/hdfs/configuration/LinkConfiguration.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs.configuration;
+
+import org.apache.sqoop.model.Config;
+import org.apache.sqoop.model.ConfigurationClass;
+
+@ConfigurationClass
+public class LinkConfiguration {
+  @Config
+  public LinkConfig linkConfig;
+
+  public LinkConfiguration() {
+    linkConfig = new LinkConfig();
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
index 90bc8bc..3d088d0 100644
--- a/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
+++ b/connector/connector-hdfs/src/main/resources/hdfs-connector-config.properties
@@ -17,6 +17,13 @@
 
 ############################
 
+# Link Config
+linkConfig.label = Link configuration
+linkConfig.help = Here you supply information necessary to connect to HDFS
+
+linkConfig.uri.label = HDFS URI
+linkConfig.uri.help = HDFS URI used to connect to HDFS
+
 # To Job Config
 #
 toJobConfig.label = ToJob configuration

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
index 124c3df..0a6369f 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestExtractor.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.etl.io.DataWriter;
 import org.apache.sqoop.job.etl.Extractor;
@@ -52,7 +52,7 @@ public class TestExtractor extends TestHdfsBase {
   private ToFormat outputFileType;
   private Class<? extends CompressionCodec> compressionClass;
   private final String inputDirectory;
-  private Extractor<EmptyConfiguration, FromJobConfiguration, HdfsPartition> extractor;
+  private Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> extractor;
 
   public TestExtractor(ToFormat outputFileType,
                        Class<? extends CompressionCodec> compressionClass)
@@ -131,7 +131,7 @@ public class TestExtractor extends TestHdfsBase {
       }
     });
 
-    EmptyConfiguration emptyLinkConfig = new EmptyConfiguration();
+    LinkConfiguration emptyLinkConfig = new LinkConfiguration();
     FromJobConfiguration emptyJobConfig = new FromJobConfiguration();
     HdfsPartition partition = createPartition(FileUtils.listDir(inputDirectory));
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
new file mode 100644
index 0000000..63e14ae
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestHdfsUtils.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+public class TestHdfsUtils {
+
+  @Test
+  public void testConfigureURI() throws Exception {
+    final String TEST_URI = "hdfs://argggg:1111";
+    LinkConfiguration linkConfiguration = new LinkConfiguration();
+    Configuration conf = new Configuration();
+
+    assertNotEquals(TEST_URI, conf.get("fs.default.name"));
+    assertNotEquals(TEST_URI, conf.get("fs.defaultFS"));
+
+    linkConfiguration.linkConfig.uri = TEST_URI;
+
+    assertEquals(conf, HdfsUtils.configureURI(conf, linkConfiguration));
+    assertEquals(TEST_URI, conf.get("fs.default.name"));
+    assertEquals(TEST_URI, conf.get("fs.defaultFS"));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java
new file mode 100644
index 0000000..176d0df
--- /dev/null
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLinkConfig.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.hdfs;
+
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfig;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestLinkConfig {
+  @Test
+  public void testValidURI() {
+    String[] URIs = {
+        "hdfs://localhost:8020",
+        "hdfs://localhost:8020/",
+        "hdfs://localhost:8020/test",
+        "hdfs://localhost:8020/test/test",
+        "hdfs://localhost:8020/test/",
+        "hdfs://localhost/",
+        "hdfs://localhost",
+        "hdfs://a:8020",
+        "file:///",
+        "file://localhost/",
+        "file://localhost/tmp",
+        "file://localhost/tmp/"
+    };
+    for (String uri : URIs) {
+      LinkConfig config = new LinkConfig();
+      LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator();
+      config.uri = uri;
+      validator.validate(config);
+      assertTrue(uri, validator.getStatus().canProceed());
+    }
+  }
+
+  @Test
+  public void testInvalidURI() {
+    String[] URIs = {
+        "://localhost:8020",
+        ":///",
+        "://",
+        "hdfs:",
+        "hdfs//",
+        "file//localhost/",
+        "-://localhost/"
+    };
+    for (String uri : URIs) {
+      LinkConfig config = new LinkConfig();
+      LinkConfig.ConfigValidator validator = new LinkConfig.ConfigValidator();
+      config.uri = uri;
+      validator.validate(config);
+      assertFalse(uri, validator.getStatus().canProceed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
index 8429e15..a30d410 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestLoader.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToCompression;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.connector.hdfs.configuration.ToJobConfiguration;
@@ -121,7 +121,7 @@ public class TestLoader extends TestHdfsBase {
         return null;
       }
     }, null);
-    EmptyConfiguration linkConf = new EmptyConfiguration();
+    LinkConfiguration linkConf = new LinkConfiguration();
     ToJobConfiguration jobConf = new ToJobConfiguration();
     jobConf.toJobConfig.outputDirectory = outputDirectory;
     jobConf.toJobConfig.compression = compression;

http://git-wip-us.apache.org/repos/asf/sqoop/blob/520fc33c/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
----------------------------------------------------------------------
diff --git a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
index bef1984..04e09cd 100644
--- a/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
+++ b/connector/connector-hdfs/src/test/java/org/apache/sqoop/connector/hdfs/TestPartitioner.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.sqoop.common.PrefixContext;
-import org.apache.sqoop.connector.common.EmptyConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
+import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.hdfs.configuration.ToFormat;
 import org.apache.sqoop.job.etl.Partition;
 import org.apache.sqoop.job.etl.Partitioner;
@@ -98,7 +98,7 @@ public class TestPartitioner extends TestHdfsBase {
     Configuration conf = new Configuration();
     PrefixContext prefixContext = new PrefixContext(conf, "org.apache.sqoop.job.connector.from.context.");
     PartitionerContext context = new PartitionerContext(prefixContext, 5, null);
-    EmptyConfiguration linkConf = new EmptyConfiguration();
+    LinkConfiguration linkConf = new LinkConfiguration();
     FromJobConfiguration jobConf = new FromJobConfiguration();
 
     jobConf.fromJobConfig.inputDirectory = inputDirectory;