You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/06 10:12:02 UTC

sqoop git commit: SQOOP-1751: Sqoop2: Rearrange LinkConfig and ToJobConfig of Kite Connector

Repository: sqoop
Updated Branches:
  refs/heads/sqoop2 4ffa806ba -> ae26b9668


SQOOP-1751: Sqoop2: Rearrange LinkConfig and ToJobConfig of Kite Connector

(Qian Xu via Abraham Elmahrek)


Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ae26b966
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ae26b966
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ae26b966

Branch: refs/heads/sqoop2
Commit: ae26b9668e67a758b014b7f67cb2d13aebae9596
Parents: 4ffa806
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Tue Jan 6 00:27:27 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Tue Jan 6 00:27:27 2015 -0800

----------------------------------------------------------------------
 .../validators/DatasetURIValidator.java         | 45 ++++++++++++
 .../validators/HostAndPortValidator.java        | 51 ++++++++++++++
 .../validators/TestHostAndPortValidator.java    | 73 ++++++++++++++++++++
 .../connector/kite/KiteConnectorUpgrader.java   |  3 -
 .../apache/sqoop/connector/kite/KiteLoader.java |  9 ++-
 .../sqoop/connector/kite/KiteToDestroyer.java   | 23 +++---
 .../sqoop/connector/kite/KiteToInitializer.java | 11 +--
 .../kite/configuration/ConfigUtil.java          | 46 ++++++++++++
 .../kite/configuration/LinkConfig.java          | 27 ++++++--
 .../kite/configuration/ToJobConfig.java         | 24 ++-----
 .../connector/kite/util/InputValidation.java    | 40 -----------
 .../resources/kite-connector-config.properties  | 14 ++--
 .../sqoop/connector/kite/TestKiteLoader.java    |  7 +-
 .../connector/kite/TestKiteToDestroyer.java     | 19 +++--
 .../connector/kite/TestKiteToInitializer.java   | 24 ++++---
 .../kite/configuration/TestConfigUtil.java      | 43 ++++++++++++
 16 files changed, 345 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java b/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
new file mode 100644
index 0000000..2a69d5c
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import com.google.common.base.Strings;
+import org.apache.sqoop.validation.Status;
+
+import java.util.regex.Pattern;
+
+/**
+ * Ensure that given string represents a Kite dataset uri.
+ */
+public class DatasetURIValidator extends AbstractValidator<String> {
+
+  private static final Pattern DATASET_URI_PATTERN = Pattern
+      .compile("^dataset:(hive|hdfs|file):.*$");
+
+  @Override
+  public void validate(String uri) {
+    if (Strings.isNullOrEmpty(uri)) {
+      addMessage(Status.ERROR, "Cannot be null nor empty");
+      return;
+    }
+
+    if (!DATASET_URI_PATTERN.matcher(uri).matches()) {
+      addMessage(Status.ERROR, "Invalid dataset URI: " + uri);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java b/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
new file mode 100644
index 0000000..613aee4
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import com.google.common.base.Strings;
+import org.apache.sqoop.validation.Status;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Ensure that given string represents a hostname or hostname:port.
+ */
+public class HostAndPortValidator extends AbstractValidator<String> {
+
+  @Override
+  public void validate(String hostPortString) {
+    if (Strings.isNullOrEmpty(hostPortString)) {
+      addMessage(Status.ERROR, "Cannot be null nor empty");
+      return;
+    }
+
+    boolean valid = false;
+    try {
+      URI uri = new URI("hdfs://" + hostPortString);
+      valid = uri.getHost() != null &&
+          (!hostPortString.contains(":") || uri.getPort() > -1);
+    } catch (URISyntaxException ignored) {
+    }
+    if (!valid) {
+      addMessage(Status.ERROR, "Invalid host and port string: " +
+          hostPortString);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java b/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
new file mode 100644
index 0000000..4ca3048
--- /dev/null
+++ b/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import org.apache.sqoop.validation.Status;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHostAndPortValidator {
+
+  AbstractValidator<String> validator = new HostAndPortValidator();
+
+  @Before
+  public void setUp() {
+    validator.reset();
+    assertEquals(0, validator.getMessages().size());
+  }
+
+  @Test
+  public void testValidHostAndPort() {
+    expectValid("host1:8020");
+  }
+
+  @Test
+  public void testValidHost() {
+    expectValid("host1");
+  }
+
+  private void expectValid(String input) {
+    validator.validate(input);
+    assertEquals(Status.OK, validator.getStatus());
+    assertEquals(0, validator.getMessages().size());
+  }
+
+  @Test
+  public void testInvalidPort() {
+    expectInvalid("host1:invalid_port");
+  }
+
+  @Test
+  public void testNegativePort() {
+    expectInvalid("host1:-1");
+  }
+
+  @Test
+  public void testHostNameWithInvalidChars() {
+    expectInvalid("hostname has space:8020");
+  }
+
+  private void expectInvalid(String input) {
+    validator.validate(input);
+    assertEquals(Status.ERROR, validator.getStatus());
+    assertEquals(1, validator.getMessages().size());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
index d3b9f95..745460f 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
@@ -18,15 +18,12 @@
  */
 package org.apache.sqoop.connector.kite;
 
-import org.apache.log4j.Logger;
 import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
 import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
-import org.apache.sqoop.model.MConfig;
 import org.apache.sqoop.model.MFromConfig;
 import org.apache.sqoop.model.MLinkConfig;
 import org.apache.sqoop.model.MToConfig;
 
-//NOTE: All config types have the similar upgrade path at this point
 public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
 
   @Override

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index 0a46f4a..b115242 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.etl.io.DataReader;
@@ -50,9 +51,11 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
 
   @Override
   public void load(LoaderContext context, LinkConfiguration linkConfig,
-      ToJobConfiguration jobConfig) throws Exception {
-    KiteDatasetExecutor executor = getExecutor(jobConfig.toJobConfig.uri,
-        context.getSchema(), linkConfig.linkConfig.fileFormat);
+      ToJobConfiguration toJobConfig) throws Exception {
+    String uri = ConfigUtil.buildDatasetUri(
+        linkConfig.linkConfig, toJobConfig.toJobConfig);
+    KiteDatasetExecutor executor = getExecutor(
+        uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
     LOG.info("Temporary dataset created.");
 
     DataReader reader = context.getDataReader();

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
index 25912b4..3b36f1d 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Destroyer;
@@ -39,23 +40,23 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
 
   @Override
   public void destroy(DestroyerContext context,
-      LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+      LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
     LOG.info("Running Kite connector destroyer");
-    String[] uris = KiteDatasetExecutor.listTemporaryDatasetUris(
-        jobConfig.toJobConfig.uri);
+    String uri = ConfigUtil.buildDatasetUri(
+        linkConfig.linkConfig, toJobConfig.toJobConfig);
+    String[] tempUris = KiteDatasetExecutor.listTemporaryDatasetUris(uri);
     if (context.isSuccess()) {
       KiteDatasetExecutor executor = getExecutor(
-          jobConfig.toJobConfig.uri, context.getSchema(),
-          linkConfig.linkConfig.fileFormat);
-      for (String uri : uris) {
-        executor.mergeDataset(uri);
-        LOG.info(String.format("Temporary dataset %s has been merged", uri));
+          uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
+      for (String tempUri : tempUris) {
+        executor.mergeDataset(tempUri);
+        LOG.info(String.format("Temporary dataset %s has been merged", tempUri));
       }
     } else {
-      for (String uri : uris) {
-        KiteDatasetExecutor.deleteDataset(uri);
+      for (String tempUri : tempUris) {
+        KiteDatasetExecutor.deleteDataset(tempUri);
         LOG.warn(String.format("Failed to import. " +
-            "Temporary dataset %s has been deleted", uri));
+            "Temporary dataset %s has been deleted", tempUri));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index 11233a8..ad5898f 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
 import org.apache.log4j.Logger;
 import org.apache.sqoop.common.SqoopException;
 import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
 import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.job.etl.Initializer;
@@ -42,8 +43,10 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
 
   @Override
   public void initialize(InitializerContext context,
-      LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
-    if (KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) {
+      LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
+    String uri = ConfigUtil.buildDatasetUri(
+        linkConfig.linkConfig, toJobConfig.toJobConfig);
+    if (KiteDatasetExecutor.datasetExists(uri)) {
       LOG.error("Overwrite an existing dataset is not expected in new create mode.");
       throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
     }
@@ -56,7 +59,7 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
     jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
     jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
     jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
-    if (FileFormat.CSV.equals(linkConfig.linkConfig.fileFormat)) {
+    if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
       jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
     }
     return jars;
@@ -64,7 +67,7 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
 
   @Override
   public Schema getSchema(InitializerContext context,
-      LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+      LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
     return NullSchema.getInstance();
   }
 

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
new file mode 100644
index 0000000..efc3966
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import com.google.common.base.Strings;
+
+public class ConfigUtil {
+
+  /**
+   * Returns a dataset uri, including the filesystem location part, if it is
+   * provided separated,
+   */
+  public static String buildDatasetUri(String fsLocation, String uri) {
+    if (!Strings.isNullOrEmpty(fsLocation) && !uri.contains("://")) {
+      // Add fsLocation after the second colon
+      int p = uri.indexOf(":", uri.indexOf(":") + 1);
+      return uri.substring(0, p + 1) + "//" + fsLocation + uri.substring(p + 1);
+    }
+    return uri;
+  }
+
+  /**
+   * Returns a dataset uri, including the filesystem location part, if it is
+   * provided separated,
+   */
+  public static String buildDatasetUri(LinkConfig linkConfig,
+      ToJobConfig toJobConfig) {
+    return buildDatasetUri(linkConfig.hdfsHostAndPort, toJobConfig.uri);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
index 89bd9b3..c40092d 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
@@ -17,14 +17,33 @@
  */
 package org.apache.sqoop.connector.kite.configuration;
 
-import org.apache.sqoop.connector.common.FileFormat;
+import com.google.common.base.Strings;
 import org.apache.sqoop.model.ConfigClass;
 import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.HostAndPortValidator;
 
-@ConfigClass
+@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
 public class LinkConfig {
 
-  @Input
-  public FileFormat fileFormat = FileFormat.AVRO;
+  @Input(size = 255)
+  public String hdfsHostAndPort;
+
+  public static class ConfigValidator extends AbstractValidator<LinkConfig> {
+
+    @Override
+    public void validate(LinkConfig config) {
+      // TODO: There is no way to declare it as optional (SQOOP-1643), we cannot validate it directly using HostAndPortValidator.
+      if (!Strings.isNullOrEmpty(config.hdfsHostAndPort)) {
+        HostAndPortValidator validator = new HostAndPortValidator();
+        validator.validate(config.hdfsHostAndPort);
+        if (!validator.getStatus().equals(Status.OK)) {
+          addMessage(validator.getStatus(), getMessages().toString());
+        }
+      }
+    }
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
index 70b7dc3..07ee8cf 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
@@ -17,30 +17,20 @@
  */
 package org.apache.sqoop.connector.kite.configuration;
 
-import org.apache.sqoop.connector.kite.util.InputValidation;
+import org.apache.sqoop.connector.common.FileFormat;
 import org.apache.sqoop.model.ConfigClass;
 import org.apache.sqoop.model.Input;
 import org.apache.sqoop.model.Validator;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.DatasetURIValidator;
+import org.apache.sqoop.validation.validators.NotNull;
 
-@ConfigClass(validators = {@Validator(ToJobConfig.ConfigValidator.class)})
+@ConfigClass
 public class ToJobConfig {
 
-  @Input(size = 255)
+  @Input(size = 255, validators = {@Validator(DatasetURIValidator.class)})
   public String uri;
 
-  public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
-
-    @Override
-    public void validate(ToJobConfig config) {
-      try {
-        InputValidation.validateDatasetUriScheme(config.uri);
-      } catch (IllegalArgumentException ex) {
-        addMessage(Status.ERROR, ex.toString());
-      }
-    }
-
-  }
+  @Input(validators = {@Validator(NotNull.class)})
+  public FileFormat fileFormat;
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
deleted file mode 100644
index 53fab02..0000000
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.kite.util;
-
-import java.util.regex.Pattern;
-
-/**
- * The helper class arranges to validate user inputs.
- */
-public class InputValidation {
-
-  private static Pattern DATASET_URI_PATTERN = Pattern
-      .compile("^dataset:(hive|hdfs|file):.*$");
-
-  /**
-   * Validates the correctness of user input dataset uri.
-   */
-  public static void validateDatasetUriScheme(String uri)
-      throws IllegalArgumentException {
-    if (!DATASET_URI_PATTERN.matcher(uri).matches()) {
-      throw new IllegalArgumentException("Invalid dataset URI: " + uri);
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/resources/kite-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties
index 27c77b4..65541c5 100644
--- a/connector/connector-kite/src/main/resources/kite-connector-config.properties
+++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties
@@ -22,11 +22,8 @@ linkConfig.label = Link Configuration
 linkConfig.help = You must supply the information requested in order to create a \
   connection object.
 
-linkConfig.fileFormat.label = File format
-linkConfig.fileFormat.help = Format in which data should be serialized
-
-linkConfig.compression.label = Compression format
-linkConfig.compression.help = Compression that should be used for the data
+linkConfig.hdfsHostAndPort.label = HDFS host and port
+linkConfig.hdfsHostAndPort.help = Optional to override HDFS file system location.
 
 # To Job Config
 #
@@ -36,5 +33,8 @@ toJobConfig.help = You must supply the information requested in order to \
 
 toJobConfig.uri.label = Dataset URI
 toJobConfig.uri.help = Location to store dataset (i.e. \
-  "dataset:hdfs://host:port/user/me/job", \
-  "dataset:hive://host:port/table")
+  "dataset:hdfs://<host>[:port]/<path>/<namespace>/<dataset>", \
+  "dataset:hive://<namespace>/<dataset>")
+
+toJobConfig.fileFormat.label = File format
+toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
index a1016a0..d302aef 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
@@ -31,14 +31,13 @@ import org.junit.Test;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.Mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 public class TestKiteLoader {
 
   private KiteLoader loader;
 
-  @Mock
+  @org.mockito.Mock
   private KiteDatasetExecutor executorMock;
 
   @Before
@@ -84,10 +83,10 @@ public class TestKiteLoader {
     };
     LoaderContext context = new LoaderContext(null, reader, schema);
     LinkConfiguration linkConfig = new LinkConfiguration();
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
+    ToJobConfiguration toJobConfig = new ToJobConfiguration();
 
     // exercise
-    loader.load(context, linkConfig, jobConfig);
+    loader.load(context, linkConfig, toJobConfig);
 
     // verify
     verify(executorMock, times(NUMBER_OF_ROWS)).writeRecord(

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 4051fda..87ed906 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -32,7 +32,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.Mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@@ -45,11 +44,11 @@ public class TestKiteToDestroyer {
 
   private LinkConfiguration linkConfig;
 
-  private ToJobConfiguration jobConfig;
+  private ToJobConfiguration toJobConfig;
 
   private final String[] expectedUris = new String[]{"a", "b"};
 
-  @Mock
+  @org.mockito.Mock
   private KiteDatasetExecutor executorMock;
 
   @Before
@@ -66,20 +65,20 @@ public class TestKiteToDestroyer {
     };
 
     linkConfig = new LinkConfiguration();
-    linkConfig.linkConfig.fileFormat = FileFormat.AVRO;
-    jobConfig = new ToJobConfiguration();
-    jobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
+    toJobConfig = new ToJobConfiguration();
+    toJobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
+    toJobConfig.toJobConfig.fileFormat = FileFormat.AVRO;
   }
 
   @Test
   public void testDestroyForSuccessfulJob() {
     // setup
     DestroyerContext context = new DestroyerContext(null, true, null);
-    when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
+    when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
 
     // exercise
-    destroyer.destroy(context, linkConfig, jobConfig);
+    destroyer.destroy(context, linkConfig, toJobConfig);
 
     // verify
     for (String uri : expectedUris) {
@@ -91,14 +90,14 @@ public class TestKiteToDestroyer {
   public void testDestroyForFailedJob() {
     // setup
     DestroyerContext context = new DestroyerContext(null, false, null);
-    when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
+    when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
         .thenReturn(expectedUris);
     for (String uri : expectedUris) {
       when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true);
     }
 
     // exercise
-    destroyer.destroy(context, linkConfig, jobConfig);
+    destroyer.destroy(context, linkConfig, toJobConfig);
 
     // verify
     for (String uri : expectedUris) {

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
index 5f0525d..fab31f9 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
@@ -19,6 +19,7 @@
 package org.apache.sqoop.connector.kite;
 
 import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
 import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
 import org.apache.sqoop.schema.Schema;
 import org.junit.Before;
@@ -30,7 +31,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.Mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 
@@ -40,7 +40,7 @@ public class TestKiteToInitializer {
 
   private KiteToInitializer initializer;
 
-  @Mock
+  @org.mockito.Mock
   private KiteDatasetExecutor executorMock;
 
   @Before
@@ -54,25 +54,27 @@ public class TestKiteToInitializer {
   @Test
   public void testInitializePassed() {
     // setup
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
-    jobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
-    when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration toJobConfig = new ToJobConfiguration();
+    toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
+    when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
         .thenReturn(false);
 
     // exercise
-    initializer.initialize(null, null, jobConfig);
+    initializer.initialize(null, linkConfig, toJobConfig);
   }
 
-  @Test(expected=SqoopException.class)
+  @Test(expected = SqoopException.class)
   public void testInitializeFailed() {
     // setup
-    ToJobConfiguration jobConfig = new ToJobConfiguration();
-    jobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
-    when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
+    LinkConfiguration linkConfig = new LinkConfiguration();
+    ToJobConfiguration toJobConfig = new ToJobConfiguration();
+    toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
+    when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
         .thenReturn(true);
 
     // exercise
-    initializer.initialize(null, null, jobConfig);
+    initializer.initialize(null, linkConfig, toJobConfig);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
new file mode 100644
index 0000000..c56ef02
--- /dev/null
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test configuration objects.
+ */
+public class TestConfigUtil {
+
+  @Test
+  public void testBuildDatasetUri() {
+    String actual = ConfigUtil.buildDatasetUri("namenode:8020",
+        "dataset:hdfs:/path/to/ds");
+    assertEquals("dataset:hdfs://namenode:8020/path/to/ds", actual);
+  }
+
+  @Test
+  public void testBuildDatasetUriHdfsHostPortIgnored() {
+    String expected = "dataset:hdfs://namenode2:8020/path/to/ds";
+    String actual = ConfigUtil.buildDatasetUri("namenode:8020", expected);
+    assertEquals(expected, actual);
+  }
+
+}
\ No newline at end of file