You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sqoop.apache.org by ab...@apache.org on 2015/01/06 10:12:02 UTC
sqoop git commit: SQOOP-1751: Sqoop2: Rearrange LinkConfig and
ToJobConfig of Kite Connector
Repository: sqoop
Updated Branches:
refs/heads/sqoop2 4ffa806ba -> ae26b9668
SQOOP-1751: Sqoop2: Rearrange LinkConfig and ToJobConfig of Kite Connector
(Qian Xu via Abraham Elmahrek)
Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/ae26b966
Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/ae26b966
Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/ae26b966
Branch: refs/heads/sqoop2
Commit: ae26b9668e67a758b014b7f67cb2d13aebae9596
Parents: 4ffa806
Author: Abraham Elmahrek <ab...@apache.org>
Authored: Tue Jan 6 00:27:27 2015 -0800
Committer: Abraham Elmahrek <ab...@apache.org>
Committed: Tue Jan 6 00:27:27 2015 -0800
----------------------------------------------------------------------
.../validators/DatasetURIValidator.java | 45 ++++++++++++
.../validators/HostAndPortValidator.java | 51 ++++++++++++++
.../validators/TestHostAndPortValidator.java | 73 ++++++++++++++++++++
.../connector/kite/KiteConnectorUpgrader.java | 3 -
.../apache/sqoop/connector/kite/KiteLoader.java | 9 ++-
.../sqoop/connector/kite/KiteToDestroyer.java | 23 +++---
.../sqoop/connector/kite/KiteToInitializer.java | 11 +--
.../kite/configuration/ConfigUtil.java | 46 ++++++++++++
.../kite/configuration/LinkConfig.java | 27 ++++++--
.../kite/configuration/ToJobConfig.java | 24 ++-----
.../connector/kite/util/InputValidation.java | 40 -----------
.../resources/kite-connector-config.properties | 14 ++--
.../sqoop/connector/kite/TestKiteLoader.java | 7 +-
.../connector/kite/TestKiteToDestroyer.java | 19 +++--
.../connector/kite/TestKiteToInitializer.java | 24 ++++---
.../kite/configuration/TestConfigUtil.java | 43 ++++++++++++
16 files changed, 345 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java b/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
new file mode 100644
index 0000000..2a69d5c
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/validation/validators/DatasetURIValidator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import com.google.common.base.Strings;
+import org.apache.sqoop.validation.Status;
+
+import java.util.regex.Pattern;
+
+/**
+ * Ensure that given string represents a Kite dataset uri.
+ */
+public class DatasetURIValidator extends AbstractValidator<String> {
+
+ private static final Pattern DATASET_URI_PATTERN = Pattern
+ .compile("^dataset:(hive|hdfs|file):.*$");
+
+ @Override
+ public void validate(String uri) {
+ if (Strings.isNullOrEmpty(uri)) {
+ addMessage(Status.ERROR, "Cannot be null nor empty");
+ return;
+ }
+
+ if (!DATASET_URI_PATTERN.matcher(uri).matches()) {
+ addMessage(Status.ERROR, "Invalid dataset URI: " + uri);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java b/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
new file mode 100644
index 0000000..613aee4
--- /dev/null
+++ b/common/src/main/java/org/apache/sqoop/validation/validators/HostAndPortValidator.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import com.google.common.base.Strings;
+import org.apache.sqoop.validation.Status;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Ensure that given string represents a hostname or hostname:port.
+ */
+public class HostAndPortValidator extends AbstractValidator<String> {
+
+ @Override
+ public void validate(String hostPortString) {
+ if (Strings.isNullOrEmpty(hostPortString)) {
+ addMessage(Status.ERROR, "Cannot be null nor empty");
+ return;
+ }
+
+ boolean valid = false;
+ try {
+ URI uri = new URI("hdfs://" + hostPortString);
+ valid = uri.getHost() != null &&
+ (!hostPortString.contains(":") || uri.getPort() > -1);
+ } catch (URISyntaxException ignored) {
+ }
+ if (!valid) {
+ addMessage(Status.ERROR, "Invalid host and port string: " +
+ hostPortString);
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java b/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
new file mode 100644
index 0000000..4ca3048
--- /dev/null
+++ b/common/src/test/java/org/apache/sqoop/validation/validators/TestHostAndPortValidator.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.validation.validators;
+
+import org.apache.sqoop.validation.Status;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestHostAndPortValidator {
+
+ AbstractValidator<String> validator = new HostAndPortValidator();
+
+ @Before
+ public void setUp() {
+ validator.reset();
+ assertEquals(0, validator.getMessages().size());
+ }
+
+ @Test
+ public void testValidHostAndPort() {
+ expectValid("host1:8020");
+ }
+
+ @Test
+ public void testValidHost() {
+ expectValid("host1");
+ }
+
+ private void expectValid(String input) {
+ validator.validate(input);
+ assertEquals(Status.OK, validator.getStatus());
+ assertEquals(0, validator.getMessages().size());
+ }
+
+ @Test
+ public void testInvalidPort() {
+ expectInvalid("host1:invalid_port");
+ }
+
+ @Test
+ public void testNegativePort() {
+ expectInvalid("host1:-1");
+ }
+
+ @Test
+ public void testHostNameWithInvalidChars() {
+ expectInvalid("hostname has space:8020");
+ }
+
+ private void expectInvalid(String input) {
+ validator.validate(input);
+ assertEquals(Status.ERROR, validator.getStatus());
+ assertEquals(1, validator.getMessages().size());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
index d3b9f95..745460f 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteConnectorUpgrader.java
@@ -18,15 +18,12 @@
*/
package org.apache.sqoop.connector.kite;
-import org.apache.log4j.Logger;
import org.apache.sqoop.configurable.ConfigurableUpgradeUtil;
import org.apache.sqoop.connector.spi.ConnectorConfigurableUpgrader;
-import org.apache.sqoop.model.MConfig;
import org.apache.sqoop.model.MFromConfig;
import org.apache.sqoop.model.MLinkConfig;
import org.apache.sqoop.model.MToConfig;
-//NOTE: All config types have the similar upgrade path at this point
public class KiteConnectorUpgrader extends ConnectorConfigurableUpgrader {
@Override
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
index 0a46f4a..b115242 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteLoader.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
import com.google.common.annotations.VisibleForTesting;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.etl.io.DataReader;
@@ -50,9 +51,11 @@ public class KiteLoader extends Loader<LinkConfiguration, ToJobConfiguration> {
@Override
public void load(LoaderContext context, LinkConfiguration linkConfig,
- ToJobConfiguration jobConfig) throws Exception {
- KiteDatasetExecutor executor = getExecutor(jobConfig.toJobConfig.uri,
- context.getSchema(), linkConfig.linkConfig.fileFormat);
+ ToJobConfiguration toJobConfig) throws Exception {
+ String uri = ConfigUtil.buildDatasetUri(
+ linkConfig.linkConfig, toJobConfig.toJobConfig);
+ KiteDatasetExecutor executor = getExecutor(
+ uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
LOG.info("Temporary dataset created.");
DataReader reader = context.getDataReader();
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
index 25912b4..3b36f1d 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToDestroyer.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
import com.google.common.annotations.VisibleForTesting;
import org.apache.log4j.Logger;
import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Destroyer;
@@ -39,23 +40,23 @@ public class KiteToDestroyer extends Destroyer<LinkConfiguration,
@Override
public void destroy(DestroyerContext context,
- LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+ LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
LOG.info("Running Kite connector destroyer");
- String[] uris = KiteDatasetExecutor.listTemporaryDatasetUris(
- jobConfig.toJobConfig.uri);
+ String uri = ConfigUtil.buildDatasetUri(
+ linkConfig.linkConfig, toJobConfig.toJobConfig);
+ String[] tempUris = KiteDatasetExecutor.listTemporaryDatasetUris(uri);
if (context.isSuccess()) {
KiteDatasetExecutor executor = getExecutor(
- jobConfig.toJobConfig.uri, context.getSchema(),
- linkConfig.linkConfig.fileFormat);
- for (String uri : uris) {
- executor.mergeDataset(uri);
- LOG.info(String.format("Temporary dataset %s has been merged", uri));
+ uri, context.getSchema(), toJobConfig.toJobConfig.fileFormat);
+ for (String tempUri : tempUris) {
+ executor.mergeDataset(tempUri);
+ LOG.info(String.format("Temporary dataset %s has been merged", tempUri));
}
} else {
- for (String uri : uris) {
- KiteDatasetExecutor.deleteDataset(uri);
+ for (String tempUri : tempUris) {
+ KiteDatasetExecutor.deleteDataset(tempUri);
LOG.warn(String.format("Failed to import. " +
- "Temporary dataset %s has been deleted", uri));
+ "Temporary dataset %s has been deleted", tempUri));
}
}
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
index 11233a8..ad5898f 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/KiteToInitializer.java
@@ -20,6 +20,7 @@ package org.apache.sqoop.connector.kite;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.FileFormat;
+import org.apache.sqoop.connector.kite.configuration.ConfigUtil;
import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
@@ -42,8 +43,10 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
@Override
public void initialize(InitializerContext context,
- LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
- if (KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri)) {
+ LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
+ String uri = ConfigUtil.buildDatasetUri(
+ linkConfig.linkConfig, toJobConfig.toJobConfig);
+ if (KiteDatasetExecutor.datasetExists(uri)) {
LOG.error("Overwrite an existing dataset is not expected in new create mode.");
throw new SqoopException(KiteConnectorError.GENERIC_KITE_CONNECTOR_0001);
}
@@ -56,7 +59,7 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
jars.add(ClassUtils.jarForClass("org.kitesdk.data.Formats"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.databind.JsonNode"));
jars.add(ClassUtils.jarForClass("com.fasterxml.jackson.core.TreeNode"));
- if (FileFormat.CSV.equals(linkConfig.linkConfig.fileFormat)) {
+ if (FileFormat.CSV.equals(toJobConfig.toJobConfig.fileFormat)) {
jars.add(ClassUtils.jarForClass("au.com.bytecode.opencsv.CSVWriter"));
}
return jars;
@@ -64,7 +67,7 @@ public class KiteToInitializer extends Initializer<LinkConfiguration,
@Override
public Schema getSchema(InitializerContext context,
- LinkConfiguration linkConfig, ToJobConfiguration jobConfig) {
+ LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) {
return NullSchema.getInstance();
}
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
new file mode 100644
index 0000000..efc3966
--- /dev/null
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ConfigUtil.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import com.google.common.base.Strings;
+
+public class ConfigUtil {
+
+ /**
+ * Returns a dataset uri, including the filesystem location part, if it is
+ * provided separated,
+ */
+ public static String buildDatasetUri(String fsLocation, String uri) {
+ if (!Strings.isNullOrEmpty(fsLocation) && !uri.contains("://")) {
+ // Add fsLocation after the second colon
+ int p = uri.indexOf(":", uri.indexOf(":") + 1);
+ return uri.substring(0, p + 1) + "//" + fsLocation + uri.substring(p + 1);
+ }
+ return uri;
+ }
+
+ /**
+ * Returns a dataset uri, including the filesystem location part, if it is
+ * provided separated,
+ */
+ public static String buildDatasetUri(LinkConfig linkConfig,
+ ToJobConfig toJobConfig) {
+ return buildDatasetUri(linkConfig.hdfsHostAndPort, toJobConfig.uri);
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
index 89bd9b3..c40092d 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/LinkConfig.java
@@ -17,14 +17,33 @@
*/
package org.apache.sqoop.connector.kite.configuration;
-import org.apache.sqoop.connector.common.FileFormat;
+import com.google.common.base.Strings;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
+import org.apache.sqoop.model.Validator;
+import org.apache.sqoop.validation.Status;
+import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.HostAndPortValidator;
-@ConfigClass
+@ConfigClass(validators = {@Validator(LinkConfig.ConfigValidator.class)})
public class LinkConfig {
- @Input
- public FileFormat fileFormat = FileFormat.AVRO;
+ @Input(size = 255)
+ public String hdfsHostAndPort;
+
+ public static class ConfigValidator extends AbstractValidator<LinkConfig> {
+
+ @Override
+ public void validate(LinkConfig config) {
+ // TODO: There is no way to declare it as optional (SQOOP-1643), we cannot validate it directly using HostAndPortValidator.
+ if (!Strings.isNullOrEmpty(config.hdfsHostAndPort)) {
+ HostAndPortValidator validator = new HostAndPortValidator();
+ validator.validate(config.hdfsHostAndPort);
+ if (!validator.getStatus().equals(Status.OK)) {
+ addMessage(validator.getStatus(), getMessages().toString());
+ }
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
index 70b7dc3..07ee8cf 100644
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
+++ b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/configuration/ToJobConfig.java
@@ -17,30 +17,20 @@
*/
package org.apache.sqoop.connector.kite.configuration;
-import org.apache.sqoop.connector.kite.util.InputValidation;
+import org.apache.sqoop.connector.common.FileFormat;
import org.apache.sqoop.model.ConfigClass;
import org.apache.sqoop.model.Input;
import org.apache.sqoop.model.Validator;
-import org.apache.sqoop.validation.Status;
-import org.apache.sqoop.validation.validators.AbstractValidator;
+import org.apache.sqoop.validation.validators.DatasetURIValidator;
+import org.apache.sqoop.validation.validators.NotNull;
-@ConfigClass(validators = {@Validator(ToJobConfig.ConfigValidator.class)})
+@ConfigClass
public class ToJobConfig {
- @Input(size = 255)
+ @Input(size = 255, validators = {@Validator(DatasetURIValidator.class)})
public String uri;
- public static class ConfigValidator extends AbstractValidator<ToJobConfig> {
-
- @Override
- public void validate(ToJobConfig config) {
- try {
- InputValidation.validateDatasetUriScheme(config.uri);
- } catch (IllegalArgumentException ex) {
- addMessage(Status.ERROR, ex.toString());
- }
- }
-
- }
+ @Input(validators = {@Validator(NotNull.class)})
+ public FileFormat fileFormat;
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java b/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
deleted file mode 100644
index 53fab02..0000000
--- a/connector/connector-kite/src/main/java/org/apache/sqoop/connector/kite/util/InputValidation.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.sqoop.connector.kite.util;
-
-import java.util.regex.Pattern;
-
-/**
- * The helper class arranges to validate user inputs.
- */
-public class InputValidation {
-
- private static Pattern DATASET_URI_PATTERN = Pattern
- .compile("^dataset:(hive|hdfs|file):.*$");
-
- /**
- * Validates the correctness of user input dataset uri.
- */
- public static void validateDatasetUriScheme(String uri)
- throws IllegalArgumentException {
- if (!DATASET_URI_PATTERN.matcher(uri).matches()) {
- throw new IllegalArgumentException("Invalid dataset URI: " + uri);
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/main/resources/kite-connector-config.properties
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/main/resources/kite-connector-config.properties b/connector/connector-kite/src/main/resources/kite-connector-config.properties
index 27c77b4..65541c5 100644
--- a/connector/connector-kite/src/main/resources/kite-connector-config.properties
+++ b/connector/connector-kite/src/main/resources/kite-connector-config.properties
@@ -22,11 +22,8 @@ linkConfig.label = Link Configuration
linkConfig.help = You must supply the information requested in order to create a \
connection object.
-linkConfig.fileFormat.label = File format
-linkConfig.fileFormat.help = Format in which data should be serialized
-
-linkConfig.compression.label = Compression format
-linkConfig.compression.help = Compression that should be used for the data
+linkConfig.hdfsHostAndPort.label = HDFS host and port
+linkConfig.hdfsHostAndPort.help = Optional to override HDFS file system location.
# To Job Config
#
@@ -36,5 +33,8 @@ toJobConfig.help = You must supply the information requested in order to \
toJobConfig.uri.label = Dataset URI
toJobConfig.uri.help = Location to store dataset (i.e. \
- "dataset:hdfs://host:port/user/me/job", \
- "dataset:hive://host:port/table")
+ "dataset:hdfs://<host>[:port]/<path>/<namespace>/<dataset>", \
+ "dataset:hive://<namespace>/<dataset>")
+
+toJobConfig.fileFormat.label = File format
+toJobConfig.fileFormat.help = Specify storage format to create a dataset and cannot be changed.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
index a1016a0..d302aef 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteLoader.java
@@ -31,14 +31,13 @@ import org.junit.Test;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
public class TestKiteLoader {
private KiteLoader loader;
- @Mock
+ @org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@@ -84,10 +83,10 @@ public class TestKiteLoader {
};
LoaderContext context = new LoaderContext(null, reader, schema);
LinkConfiguration linkConfig = new LinkConfiguration();
- ToJobConfiguration jobConfig = new ToJobConfiguration();
+ ToJobConfiguration toJobConfig = new ToJobConfiguration();
// exercise
- loader.load(context, linkConfig, jobConfig);
+ loader.load(context, linkConfig, toJobConfig);
// verify
verify(executorMock, times(NUMBER_OF_ROWS)).writeRecord(
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
index 4051fda..87ed906 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToDestroyer.java
@@ -32,7 +32,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.verifyStatic;
@@ -45,11 +44,11 @@ public class TestKiteToDestroyer {
private LinkConfiguration linkConfig;
- private ToJobConfiguration jobConfig;
+ private ToJobConfiguration toJobConfig;
private final String[] expectedUris = new String[]{"a", "b"};
- @Mock
+ @org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@@ -66,20 +65,20 @@ public class TestKiteToDestroyer {
};
linkConfig = new LinkConfiguration();
- linkConfig.linkConfig.fileFormat = FileFormat.AVRO;
- jobConfig = new ToJobConfiguration();
- jobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
+ toJobConfig = new ToJobConfiguration();
+ toJobConfig.toJobConfig.uri = "dataset:file:/foo/bar";
+ toJobConfig.toJobConfig.fileFormat = FileFormat.AVRO;
}
@Test
public void testDestroyForSuccessfulJob() {
// setup
DestroyerContext context = new DestroyerContext(null, true, null);
- when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
+ when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
// exercise
- destroyer.destroy(context, linkConfig, jobConfig);
+ destroyer.destroy(context, linkConfig, toJobConfig);
// verify
for (String uri : expectedUris) {
@@ -91,14 +90,14 @@ public class TestKiteToDestroyer {
public void testDestroyForFailedJob() {
// setup
DestroyerContext context = new DestroyerContext(null, false, null);
- when(KiteDatasetExecutor.listTemporaryDatasetUris(jobConfig.toJobConfig.uri))
+ when(KiteDatasetExecutor.listTemporaryDatasetUris(toJobConfig.toJobConfig.uri))
.thenReturn(expectedUris);
for (String uri : expectedUris) {
when(KiteDatasetExecutor.deleteDataset(uri)).thenReturn(true);
}
// exercise
- destroyer.destroy(context, linkConfig, jobConfig);
+ destroyer.destroy(context, linkConfig, toJobConfig);
// verify
for (String uri : expectedUris) {
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
index 5f0525d..fab31f9 100644
--- a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/TestKiteToInitializer.java
@@ -19,6 +19,7 @@
package org.apache.sqoop.connector.kite;
import org.apache.sqoop.common.SqoopException;
+import org.apache.sqoop.connector.kite.configuration.LinkConfiguration;
import org.apache.sqoop.connector.kite.configuration.ToJobConfiguration;
import org.apache.sqoop.schema.Schema;
import org.junit.Before;
@@ -30,7 +31,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.Mock;
import static org.mockito.MockitoAnnotations.initMocks;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
@@ -40,7 +40,7 @@ public class TestKiteToInitializer {
private KiteToInitializer initializer;
- @Mock
+ @org.mockito.Mock
private KiteDatasetExecutor executorMock;
@Before
@@ -54,25 +54,27 @@ public class TestKiteToInitializer {
@Test
public void testInitializePassed() {
// setup
- ToJobConfiguration jobConfig = new ToJobConfiguration();
- jobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
- when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ ToJobConfiguration toJobConfig = new ToJobConfiguration();
+ toJobConfig.toJobConfig.uri = "dataset:file:/ds/not/exist";
+ when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
.thenReturn(false);
// exercise
- initializer.initialize(null, null, jobConfig);
+ initializer.initialize(null, linkConfig, toJobConfig);
}
- @Test(expected=SqoopException.class)
+ @Test(expected = SqoopException.class)
public void testInitializeFailed() {
// setup
- ToJobConfiguration jobConfig = new ToJobConfiguration();
- jobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
- when(KiteDatasetExecutor.datasetExists(jobConfig.toJobConfig.uri))
+ LinkConfiguration linkConfig = new LinkConfiguration();
+ ToJobConfiguration toJobConfig = new ToJobConfiguration();
+ toJobConfig.toJobConfig.uri = "dataset:file:/ds/exist";
+ when(KiteDatasetExecutor.datasetExists(toJobConfig.toJobConfig.uri))
.thenReturn(true);
// exercise
- initializer.initialize(null, null, jobConfig);
+ initializer.initialize(null, linkConfig, toJobConfig);
}
@Test
http://git-wip-us.apache.org/repos/asf/sqoop/blob/ae26b966/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
----------------------------------------------------------------------
diff --git a/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
new file mode 100644
index 0000000..c56ef02
--- /dev/null
+++ b/connector/connector-kite/src/test/java/org/apache/sqoop/connector/kite/configuration/TestConfigUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sqoop.connector.kite.configuration;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test configuration objects.
+ */
+public class TestConfigUtil {
+
+ @Test
+ public void testBuildDatasetUri() {
+ String actual = ConfigUtil.buildDatasetUri("namenode:8020",
+ "dataset:hdfs:/path/to/ds");
+ assertEquals("dataset:hdfs://namenode:8020/path/to/ds", actual);
+ }
+
+ @Test
+ public void testBuildDatasetUriHdfsHostPortIgnored() {
+ String expected = "dataset:hdfs://namenode2:8020/path/to/ds";
+ String actual = ConfigUtil.buildDatasetUri("namenode:8020", expected);
+ assertEquals(expected, actual);
+ }
+
+}
\ No newline at end of file