You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2020/11/25 03:11:54 UTC
[incubator-dolphinscheduler] branch dev updated: [FIX-4034][server]
fix sqoop import fail (#4036)
This is an automated email from the ASF dual-hosted git repository.
wenhemin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 145314b [FIX-4034][server] fix sqoop import fail (#4036)
145314b is described below
commit 145314b782c765802463031ee3e075de857a823d
Author: Yelli <ye...@gmail.com>
AuthorDate: Wed Nov 25 11:11:44 2020 +0800
[FIX-4034][server] fix sqoop import fail (#4036)
* fix #4043, sqoop import query fail
* fix #4043, sqoop task hard code & code style
* add license for SqoopConstants
* add private constructor for SqoopConstants
* fixed sqoop mysql pwd have special character
* fix checkstyle
* fix sqoop task log
* remove unused constants
---
.../apache/dolphinscheduler/common/Constants.java | 14 +++
.../enums/{QueryType.java => SqoopQueryType.java} | 30 ++++--
.../server/worker/task/sqoop/SqoopConstants.java | 74 +++++++++++++
.../server/worker/task/sqoop/SqoopTask.java | 49 +++++----
.../task/sqoop/generator/CommonGenerator.java | 63 +++++------
.../task/sqoop/generator/ISourceGenerator.java | 6 +-
.../task/sqoop/generator/ITargetGenerator.java | 6 +-
.../task/sqoop/generator/SqoopJobGenerator.java | 44 ++++----
.../generator/sources/HdfsSourceGenerator.java | 38 ++++---
.../generator/sources/HiveSourceGenerator.java | 49 +++++----
.../generator/sources/MysqlSourceGenerator.java | 114 +++++++++++---------
.../generator/targets/HdfsTargetGenerator.java | 58 +++++-----
.../generator/targets/HiveTargetGenerator.java | 70 ++++++------
.../generator/targets/MysqlTargetGenerator.java | 90 +++++++++-------
.../server/worker/task/sqoop/SqoopTaskTest.java | 119 +++++++++++++--------
15 files changed, 516 insertions(+), 308 deletions(-)
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 6da58f7..4a696d2 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -222,6 +222,11 @@ public final class Constants {
public static final String COLON = ":";
/**
+ * SPACE " "
+ */
+ public static final String SPACE = " ";
+
+ /**
* SINGLE_SLASH /
*/
public static final String SINGLE_SLASH = "/";
@@ -232,6 +237,15 @@ public final class Constants {
public static final String DOUBLE_SLASH = "//";
/**
+ * SINGLE_QUOTES "'"
+ */
+ public static final String SINGLE_QUOTES = "'";
+ /**
+ * DOUBLE_QUOTES "\""
+ */
+ public static final String DOUBLE_QUOTES = "\"";
+
+ /**
* SEMICOLON ;
*/
public static final String SEMICOLON = ";";
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
similarity index 69%
rename from dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
rename to dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
index 13820b4..4d279f5 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/QueryType.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/SqoopQueryType.java
@@ -14,20 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.common.enums;
-public enum QueryType {
+public enum SqoopQueryType {
+
+ FORM(0, "SQOOP_QUERY_FORM"),
+ SQL(1, "SQOOP_QUERY_SQL");
+
+ private final Integer code;
+
+ private final String desc;
- FORM,
- SQL;
+ SqoopQueryType(Integer code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ public Integer getCode() {
+ return code;
+ }
- public static QueryType getEnum(int value){
- for (QueryType e:QueryType.values()) {
- if(e.ordinal() == value) {
- return e;
- }
- }
- //For values out of enum scope
- return null;
+ public String getDesc() {
+ return desc;
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java
new file mode 100644
index 0000000..c19a1e0
--- /dev/null
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopConstants.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.task.sqoop;
+
+public final class SqoopConstants {
+
+ private SqoopConstants() {
+ }
+
+ //sqoop general param
+ public static final String SQOOP = "sqoop";
+ public static final String SQOOP_MR_JOB_NAME = "mapred.job.name";
+ public static final String SQOOP_PARALLELISM = "-m";
+ public static final String FIELDS_TERMINATED_BY = "--fields-terminated-by";
+ public static final String LINES_TERMINATED_BY = "--lines-terminated-by";
+ public static final String FIELD_NULL_PLACEHOLDER = "--null-non-string 'NULL' --null-string 'NULL'";
+
+ //sqoop db
+ public static final String DB_CONNECT = "--connect";
+ public static final String DB_USERNAME = "--username";
+ public static final String DB_PWD = "--password";
+ public static final String TABLE = "--table";
+ public static final String COLUMNS = "--columns";
+ public static final String QUERY_WHERE = "where";
+ public static final String QUERY = "--query";
+ public static final String QUERY_CONDITION = "AND \\$CONDITIONS";
+ public static final String QUERY_WITHOUT_CONDITION = "WHERE \\$CONDITIONS";
+ public static final String MAP_COLUMN_HIVE = "--map-column-hive";
+ public static final String MAP_COLUMN_JAVA = "--map-column-java";
+
+
+ //sqoop hive source
+ public static final String HCATALOG_DATABASE = "--hcatalog-database";
+ public static final String HCATALOG_TABLE = "--hcatalog-table";
+ public static final String HCATALOG_PARTITION_KEYS = "--hcatalog-partition-keys";
+ public static final String HCATALOG_PARTITION_VALUES = "--hcatalog-partition-values";
+
+ //sqoop hdfs
+ public static final String HDFS_EXPORT_DIR = "--export-dir";
+ public static final String TARGET_DIR = "--target-dir";
+ public static final String COMPRESSION_CODEC = "--compression-codec";
+
+ //sqoop hive
+ public static final String HIVE_IMPORT = "--hive-import";
+ public static final String HIVE_TABLE = "--hive-table";
+ public static final String CREATE_HIVE_TABLE = "--create-hive-table";
+ public static final String HIVE_DROP_IMPORT_DELIMS = "--hive-drop-import-delims";
+ public static final String HIVE_OVERWRITE = "--hive-overwrite";
+ public static final String DELETE_TARGET_DIR = "--delete-target-dir";
+ public static final String HIVE_DELIMS_REPLACEMENT = "--hive-delims-replacement";
+ public static final String HIVE_PARTITION_KEY = "--hive-partition-key";
+ public static final String HIVE_PARTITION_VALUE = "--hive-partition-value";
+
+ //sqoop update model
+ public static final String UPDATE_KEY = "--update-key";
+ public static final String UPDATE_MODE = "--update-mode";
+
+
+}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
index c66fe95..00d94f0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTask.java
@@ -14,64 +14,73 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractYarnTask;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
-import org.slf4j.Logger;
import java.util.Map;
-import org.apache.dolphinscheduler.common.utils.*;
+
+import org.slf4j.Logger;
/**
* sqoop task extends the shell task
*/
public class SqoopTask extends AbstractYarnTask {
+ /**
+ * sqoop task params
+ */
private SqoopParameters sqoopParameters;
/**
* taskExecutionContext
*/
- private TaskExecutionContext taskExecutionContext;
+ private final TaskExecutionContext sqoopTaskExecutionContext;
- public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger){
- super(taskExecutionContext,logger);
- this.taskExecutionContext = taskExecutionContext;
+ public SqoopTask(TaskExecutionContext taskExecutionContext, Logger logger) {
+ super(taskExecutionContext, logger);
+ this.sqoopTaskExecutionContext = taskExecutionContext;
}
@Override
- public void init() throws Exception {
- logger.info("sqoop task params {}", taskExecutionContext.getTaskParams());
+ public void init() {
+ logger.info("sqoop task params {}", sqoopTaskExecutionContext.getTaskParams());
sqoopParameters =
- JSONUtils.parseObject(taskExecutionContext.getTaskParams(),SqoopParameters.class);
- if (!sqoopParameters.checkParameters()) {
- throw new RuntimeException("sqoop task params is not valid");
+ JSONUtils.parseObject(sqoopTaskExecutionContext.getTaskParams(), SqoopParameters.class);
+ //check sqoop task params
+ if (null == sqoopParameters) {
+ throw new IllegalArgumentException("Sqoop Task params is null");
}
+ if (!sqoopParameters.checkParameters()) {
+ throw new IllegalArgumentException("Sqoop Task params check fail");
+ }
}
@Override
- protected String buildCommand() throws Exception {
+ protected String buildCommand() {
//get sqoop scripts
SqoopJobGenerator generator = new SqoopJobGenerator();
- String script = generator.generateSqoopJob(sqoopParameters,taskExecutionContext);
+ String script = generator.generateSqoopJob(sqoopParameters, sqoopTaskExecutionContext);
- Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- sqoopParameters.getLocalParametersMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(sqoopTaskExecutionContext.getDefinedParams()),
+ sqoopTaskExecutionContext.getDefinedParams(),
+ sqoopParameters.getLocalParametersMap(),
+ CommandType.of(sqoopTaskExecutionContext.getCmdTypeIfComplement()),
+ sqoopTaskExecutionContext.getScheduleTime());
- if(paramsMap != null){
- String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
+ if (paramsMap != null) {
+ String resultScripts = ParameterUtils.convertParameterPlaceholders(script, ParamUtils.convert(paramsMap));
logger.info("sqoop script: {}", resultScripts);
return resultScripts;
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
index ffca735..e3e7c9a 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/CommonGenerator.java
@@ -14,71 +14,72 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* common script generator
*/
public class CommonGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(CommonGenerator.class);
public String generate(SqoopParameters sqoopParameters) {
- StringBuilder result = new StringBuilder();
- try{
- result.append("sqoop ")
- .append(sqoopParameters.getModelType());
- //set sqoop job name
- result.append(" -D mapred.job.name")
- .append(Constants.EQUAL_SIGN)
- .append(sqoopParameters.getJobName());
+ StringBuilder commonSb = new StringBuilder();
+
+ try {
+ //sqoop task model
+ commonSb.append(SqoopConstants.SQOOP)
+ .append(Constants.SPACE)
+ .append(sqoopParameters.getModelType());
- //set hadoop custom param
+ //sqoop map-reduce job name
+ commonSb.append(Constants.SPACE).append(Constants.D).append(Constants.SPACE)
+ .append(String.format("%s%s%s", SqoopConstants.SQOOP_MR_JOB_NAME,
+ Constants.EQUAL_SIGN, sqoopParameters.getJobName()));
+
+ //hadoop custom param
List<Property> hadoopCustomParams = sqoopParameters.getHadoopCustomParams();
if (CollectionUtils.isNotEmpty(hadoopCustomParams)) {
for (Property hadoopCustomParam : hadoopCustomParams) {
- String hadoopCustomParamStr = " -D " + hadoopCustomParam.getProp()
- + Constants.EQUAL_SIGN + hadoopCustomParam.getValue();
+ String hadoopCustomParamStr = String.format("%s%s%s", hadoopCustomParam.getProp(),
+ Constants.EQUAL_SIGN, hadoopCustomParam.getValue());
- if (StringUtils.isNotEmpty(hadoopCustomParamStr)) {
- result.append(hadoopCustomParamStr);
- }
+ commonSb.append(Constants.SPACE).append(Constants.D)
+ .append(Constants.SPACE).append(hadoopCustomParamStr);
}
}
- //set sqoop advanced custom param
+ //sqoop custom params
List<Property> sqoopAdvancedParams = sqoopParameters.getSqoopAdvancedParams();
if (CollectionUtils.isNotEmpty(sqoopAdvancedParams)) {
-
for (Property sqoopAdvancedParam : sqoopAdvancedParams) {
- String sqoopAdvancedParamStr = " " + sqoopAdvancedParam.getProp()
- + " " + sqoopAdvancedParam.getValue();
- if (StringUtils.isNotEmpty(sqoopAdvancedParamStr)) {
- result.append(sqoopAdvancedParamStr);
- }
+ commonSb.append(Constants.SPACE).append(sqoopAdvancedParam.getProp())
+ .append(Constants.SPACE).append(sqoopAdvancedParam.getValue());
}
}
- if(sqoopParameters.getConcurrency() >0){
- result.append(" -m ")
- .append(sqoopParameters.getConcurrency());
+ //sqoop parallelism
+ if (sqoopParameters.getConcurrency() > 0) {
+ commonSb.append(Constants.SPACE).append(SqoopConstants.SQOOP_PARALLELISM)
+ .append(Constants.SPACE).append(sqoopParameters.getConcurrency());
}
- }catch (Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop task general param build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return commonSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
index 841654b..e6a9576 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ISourceGenerator.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -26,9 +27,10 @@ public interface ISourceGenerator {
/**
* generate the source script
- * @param sqoopParameters sqoopParameters
+ *
+ * @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return source script
*/
- String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
+ String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
index 7bdaf49..eb355a7 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/ITargetGenerator.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -26,9 +27,10 @@ public interface ITargetGenerator {
/**
* generate the target script
- * @param sqoopParameters sqoopParameters
+ *
+ * @param sqoopParameters sqoopParameters
* @param taskExecutionContext taskExecutionContext
* @return target script
*/
- String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext);
+ String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext);
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
index 6bc94d0..9feaffa 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/SqoopJobGenerator.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator;
import org.apache.dolphinscheduler.common.enums.SqoopJobType;
@@ -46,37 +47,38 @@ public class SqoopJobGenerator {
/**
* common script generator
*/
- private CommonGenerator commonGenerator;
+ private final CommonGenerator commonGenerator;
- public SqoopJobGenerator(){
+ public SqoopJobGenerator() {
commonGenerator = new CommonGenerator();
}
- private void createSqoopJobGenerator(String sourceType,String targetType){
+ private void createSqoopJobGenerator(String sourceType, String targetType) {
sourceGenerator = createSourceGenerator(sourceType);
targetGenerator = createTargetGenerator(targetType);
}
/**
* get the final sqoop scripts
- * @param sqoopParameters
- * @return
+ *
+ * @param sqoopParameters sqoop params
+ * @return sqoop scripts
*/
- public String generateSqoopJob(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext){
+ public String generateSqoopJob(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
String sqoopScripts = "";
if (SqoopJobType.TEMPLATE.getDescp().equals(sqoopParameters.getJobType())) {
- createSqoopJobGenerator(sqoopParameters.getSourceType(),sqoopParameters.getTargetType());
- if(sourceGenerator == null || targetGenerator == null){
+ createSqoopJobGenerator(sqoopParameters.getSourceType(), sqoopParameters.getTargetType());
+ if (sourceGenerator == null || targetGenerator == null) {
throw new RuntimeException("sqoop task source type or target type is null");
}
- sqoopScripts = commonGenerator.generate(sqoopParameters)
- + sourceGenerator.generate(sqoopParameters,taskExecutionContext)
- + targetGenerator.generate(sqoopParameters,taskExecutionContext);
+ sqoopScripts = String.format("%s%s%s", commonGenerator.generate(sqoopParameters),
+ sourceGenerator.generate(sqoopParameters, taskExecutionContext),
+ targetGenerator.generate(sqoopParameters, taskExecutionContext));
} else if (SqoopJobType.CUSTOM.getDescp().equals(sqoopParameters.getJobType())) {
- sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n");
+ sqoopScripts = sqoopParameters.getCustomShell().replaceAll("\\r\\n", "\n");
}
return sqoopScripts;
@@ -84,11 +86,12 @@ public class SqoopJobGenerator {
/**
* get the source generator
- * @param sourceType
- * @return
+ *
+ * @param sourceType sqoop source type
+ * @return sqoop source generator
*/
- private ISourceGenerator createSourceGenerator(String sourceType){
- switch (sourceType){
+ private ISourceGenerator createSourceGenerator(String sourceType) {
+ switch (sourceType) {
case MYSQL:
return new MysqlSourceGenerator();
case HIVE:
@@ -102,11 +105,12 @@ public class SqoopJobGenerator {
/**
* get the target generator
- * @param targetType
- * @return
+ *
+ * @param targetType sqoop target type
+ * @return sqoop target generator
*/
- private ITargetGenerator createTargetGenerator(String targetType){
- switch (targetType){
+ private ITargetGenerator createTargetGenerator(String targetType) {
+ switch (targetType) {
case MYSQL:
return new MysqlTargetGenerator();
case HIVE:
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
index ded0d6d..549d5db 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HdfsSourceGenerator.java
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHdfsParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,28 +34,30 @@ import org.slf4j.LoggerFactory;
*/
public class HdfsSourceGenerator implements ISourceGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(HdfsSourceGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
- StringBuilder result = new StringBuilder();
- try{
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+ StringBuilder hdfsSourceSb = new StringBuilder();
+
+ try {
SourceHdfsParameter sourceHdfsParameter
- = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHdfsParameter.class);
-
- if(sourceHdfsParameter != null){
- if(StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())){
- result.append(" --export-dir ")
- .append(sourceHdfsParameter.getExportDir());
- }else{
- throw new Exception("--export-dir is null");
+ = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHdfsParameter.class);
+
+ if (null != sourceHdfsParameter) {
+ if (StringUtils.isNotEmpty(sourceHdfsParameter.getExportDir())) {
+ hdfsSourceSb.append(Constants.SPACE).append(SqoopConstants.HDFS_EXPORT_DIR)
+ .append(Constants.SPACE).append(sourceHdfsParameter.getExportDir());
+ } else {
+ throw new IllegalArgumentException("Sqoop hdfs export dir is null");
}
}
- }catch (Exception e){
- logger.error("get hdfs source failed",e);
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop hdfs source parmas build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return hdfsSourceSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
index 131b616..3229dca 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/HiveSourceGenerator.java
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceHiveParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,33 +34,40 @@ import org.slf4j.LoggerFactory;
*/
public class HiveSourceGenerator implements ISourceGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(HiveSourceGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
- StringBuilder sb = new StringBuilder();
- try{
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+ StringBuilder hiveSourceSb = new StringBuilder();
+
+ try {
SourceHiveParameter sourceHiveParameter
- = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceHiveParameter.class);
- if(sourceHiveParameter != null){
- if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())){
- sb.append(" --hcatalog-database ").append(sourceHiveParameter.getHiveDatabase());
+ = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceHiveParameter.class);
+
+ if (null != sourceHiveParameter) {
+ if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveDatabase())) {
+ hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_DATABASE)
+ .append(Constants.SPACE).append(sourceHiveParameter.getHiveDatabase());
}
- if(StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())){
- sb.append(" --hcatalog-table ").append(sourceHiveParameter.getHiveTable());
+ if (StringUtils.isNotEmpty(sourceHiveParameter.getHiveTable())) {
+ hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_TABLE)
+ .append(Constants.SPACE).append(sourceHiveParameter.getHiveTable());
}
- if(StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())&&
- StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())){
- sb.append(" --hcatalog-partition-keys ").append(sourceHiveParameter.getHivePartitionKey())
- .append(" --hcatalog-partition-values ").append(sourceHiveParameter.getHivePartitionValue());
+ if (StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionKey())
+ && StringUtils.isNotEmpty(sourceHiveParameter.getHivePartitionValue())) {
+ hiveSourceSb.append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_KEYS)
+ .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionKey())
+ .append(Constants.SPACE).append(SqoopConstants.HCATALOG_PARTITION_VALUES)
+ .append(Constants.SPACE).append(sourceHiveParameter.getHivePartitionValue());
}
}
- }catch (Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop hive source params build failed: [%s]", e.getMessage()));
}
- return sb.toString();
+ return hiveSourceSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
index 47430d1..63738c8 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/sources/MysqlSourceGenerator.java
@@ -14,106 +14,118 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.sources;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
-import org.apache.dolphinscheduler.common.enums.QueryType;
+import org.apache.dolphinscheduler.common.enums.SqoopQueryType;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ISourceGenerator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* mysql source generator
*/
public class MysqlSourceGenerator implements ISourceGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(MysqlSourceGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
- StringBuilder result = new StringBuilder();
- try {
- SourceMysqlParameter sourceMysqlParameter
- = JSONUtils.parseObject(sqoopParameters.getSourceParams(),SourceMysqlParameter.class);
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+ StringBuilder mysqlSourceSb = new StringBuilder();
+ try {
+ SourceMysqlParameter sourceMysqlParameter = JSONUtils.parseObject(sqoopParameters.getSourceParams(), SourceMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
- if(sourceMysqlParameter != null){
+ if (null != sourceMysqlParameter) {
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getSourcetype()),
- sqoopTaskExecutionContext.getSourceConnectionParams());
- if(baseDataSource != null){
- result.append(" --connect ")
- .append(baseDataSource.getJdbcUrl())
- .append(" --username ")
- .append(baseDataSource.getUser())
- .append(" --password ")
- .append(baseDataSource.getPassword());
-
- if(sourceMysqlParameter.getSrcQueryType() == QueryType.FORM.ordinal()){
- if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())){
- result.append(" --table ").append(sourceMysqlParameter.getSrcTable());
+ sqoopTaskExecutionContext.getSourceConnectionParams());
+
+ if (null != baseDataSource) {
+
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
+ .append(Constants.SPACE).append(baseDataSource.getJdbcUrl())
+ .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
+ .append(Constants.SPACE).append(baseDataSource.getUser())
+ .append(Constants.SPACE).append(SqoopConstants.DB_PWD)
+ .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES);
+
+ //sqoop table & sql query
+ if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.FORM.getCode()) {
+ if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcTable())) {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.TABLE)
+ .append(Constants.SPACE).append(sourceMysqlParameter.getSrcTable());
}
- if(StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())){
- result.append(" --columns ").append(sourceMysqlParameter.getSrcColumns());
+ if (StringUtils.isNotEmpty(sourceMysqlParameter.getSrcColumns())) {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
+ .append(Constants.SPACE).append(sourceMysqlParameter.getSrcColumns());
}
+ } else if (sourceMysqlParameter.getSrcQueryType() == SqoopQueryType.SQL.getCode()
+ && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())) {
- }else if(sourceMysqlParameter.getSrcQueryType() == QueryType.SQL.ordinal()
- && StringUtils.isNotEmpty(sourceMysqlParameter.getSrcQuerySql())){
String srcQuery = sourceMysqlParameter.getSrcQuerySql();
- if(srcQuery.toLowerCase().contains("where")){
- srcQuery += " AND "+"$CONDITIONS";
- }else{
- srcQuery += " WHERE $CONDITIONS";
- }
- result.append(" --query \'").append(srcQuery).append("\'");
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY)
+ .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(srcQuery);
+ if (srcQuery.toLowerCase().contains(SqoopConstants.QUERY_WHERE)) {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_CONDITION).append(Constants.DOUBLE_QUOTES);
+ } else {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.QUERY_WITHOUT_CONDITION).append(Constants.DOUBLE_QUOTES);
+ }
}
- List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
+ //sqoop hive map column
+ List<Property> mapColumnHive = sourceMysqlParameter.getMapColumnHive();
- if(mapColumnHive != null && !mapColumnHive.isEmpty()){
+ if (null != mapColumnHive && !mapColumnHive.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
- for(Property item:mapColumnHive){
- columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
+ for (Property item : mapColumnHive) {
+ columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
}
- if(StringUtils.isNotEmpty(columnMap.toString())){
- result.append(" --map-column-hive ")
- .append(columnMap.substring(0,columnMap.length() - 1));
+ if (StringUtils.isNotEmpty(columnMap.toString())) {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_HIVE)
+ .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
}
}
- List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
+ //sqoop map column java
+ List<Property> mapColumnJava = sourceMysqlParameter.getMapColumnJava();
- if(mapColumnJava != null && !mapColumnJava.isEmpty()){
+ if (null != mapColumnJava && !mapColumnJava.isEmpty()) {
StringBuilder columnMap = new StringBuilder();
- for(Property item:mapColumnJava){
- columnMap.append(item.getProp()).append("=").append(item.getValue()).append(",");
+ for (Property item : mapColumnJava) {
+ columnMap.append(item.getProp()).append(Constants.EQUAL_SIGN).append(item.getValue()).append(Constants.COMMA);
}
- if(StringUtils.isNotEmpty(columnMap.toString())){
- result.append(" --map-column-java ")
- .append(columnMap.substring(0,columnMap.length() - 1));
+ if (StringUtils.isNotEmpty(columnMap.toString())) {
+ mysqlSourceSb.append(Constants.SPACE).append(SqoopConstants.MAP_COLUMN_JAVA)
+ .append(Constants.SPACE).append(columnMap.substring(0, columnMap.length() - 1));
}
}
}
}
- }catch (Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop task mysql source params build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return mysqlSourceSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
index f94d10a..3ea3254 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HdfsTargetGenerator.java
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHdfsParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,47 +34,53 @@ import org.slf4j.LoggerFactory;
*/
public class HdfsTargetGenerator implements ITargetGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(HdfsTargetGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
- StringBuilder result = new StringBuilder();
- try{
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
+
+ StringBuilder hdfsTargetSb = new StringBuilder();
+
+ try {
TargetHdfsParameter targetHdfsParameter =
- JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHdfsParameter.class);
+ JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHdfsParameter.class);
- if(targetHdfsParameter != null){
+ if (null != targetHdfsParameter) {
- if(StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())){
- result.append(" --target-dir ").append(targetHdfsParameter.getTargetPath());
+ if (StringUtils.isNotEmpty(targetHdfsParameter.getTargetPath())) {
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.TARGET_DIR)
+ .append(Constants.SPACE).append(targetHdfsParameter.getTargetPath());
}
- if(StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())){
- result.append(" --compression-codec ").append(targetHdfsParameter.getCompressionCodec());
+ if (StringUtils.isNotEmpty(targetHdfsParameter.getCompressionCodec())) {
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.COMPRESSION_CODEC)
+ .append(Constants.SPACE).append(targetHdfsParameter.getCompressionCodec());
}
- if(StringUtils.isNotEmpty(targetHdfsParameter.getFileType())){
- result.append(" ").append(targetHdfsParameter.getFileType());
+ if (StringUtils.isNotEmpty(targetHdfsParameter.getFileType())) {
+ hdfsTargetSb.append(Constants.SPACE).append(targetHdfsParameter.getFileType());
}
- if(targetHdfsParameter.isDeleteTargetDir()){
- result.append(" --delete-target-dir");
+ if (targetHdfsParameter.isDeleteTargetDir()) {
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
}
- if(StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())){
- result.append(" --fields-terminated-by '").append(targetHdfsParameter.getFieldsTerminated()).append("'");
+ if (StringUtils.isNotEmpty(targetHdfsParameter.getFieldsTerminated())) {
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY)
+ .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
}
- if(StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())){
- result.append(" --lines-terminated-by '").append(targetHdfsParameter.getLinesTerminated()).append("'");
+ if (StringUtils.isNotEmpty(targetHdfsParameter.getLinesTerminated())) {
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY)
+ .append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetHdfsParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
}
- result.append(" --null-non-string 'NULL' --null-string 'NULL'");
+ hdfsTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELD_NULL_PLACEHOLDER);
}
- }catch(Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop hdfs target params build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return hdfsTargetSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
index 83c4123..69a95d0 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/HiveTargetGenerator.java
@@ -14,14 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetHiveParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,57 +34,57 @@ import org.slf4j.LoggerFactory;
*/
public class HiveTargetGenerator implements ITargetGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(HiveTargetGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
- StringBuilder result = new StringBuilder();
+ StringBuilder hiveTargetSb = new StringBuilder();
- try{
+ try {
TargetHiveParameter targetHiveParameter =
- JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetHiveParameter.class);
- if(targetHiveParameter != null){
-
- result.append(" --hive-import ");
+ JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetHiveParameter.class);
+ if (null != targetHiveParameter) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_IMPORT);
- if(StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())&&
- StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())){
- result.append(" --hive-table ")
- .append(targetHiveParameter.getHiveDatabase())
- .append(".")
- .append(targetHiveParameter.getHiveTable());
+ if (StringUtils.isNotEmpty(targetHiveParameter.getHiveDatabase())
+ && StringUtils.isNotEmpty(targetHiveParameter.getHiveTable())) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_TABLE)
+ .append(Constants.SPACE).append(String.format("%s.%s", targetHiveParameter.getHiveDatabase(),
+ targetHiveParameter.getHiveTable()));
}
- if(targetHiveParameter.isCreateHiveTable()){
- result.append(" --create-hive-table");
+ if (targetHiveParameter.isCreateHiveTable()) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.CREATE_HIVE_TABLE);
}
- if(targetHiveParameter.isDropDelimiter()){
- result.append(" --hive-drop-import-delims");
+ if (targetHiveParameter.isDropDelimiter()) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DROP_IMPORT_DELIMS);
}
- if(targetHiveParameter.isHiveOverWrite()){
- result.append(" --hive-overwrite -delete-target-dir");
+ if (targetHiveParameter.isHiveOverWrite()) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_OVERWRITE)
+ .append(Constants.SPACE).append(SqoopConstants.DELETE_TARGET_DIR);
}
- if(StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())){
- result.append(" --hive-delims-replacement ").append(targetHiveParameter.getReplaceDelimiter());
+ if (StringUtils.isNotEmpty(targetHiveParameter.getReplaceDelimiter())) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_DELIMS_REPLACEMENT)
+ .append(Constants.SPACE).append(targetHiveParameter.getReplaceDelimiter());
}
- if(StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())&&
- StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())){
- result.append(" --hive-partition-key ")
- .append(targetHiveParameter.getHivePartitionKey())
- .append(" --hive-partition-value ")
- .append(targetHiveParameter.getHivePartitionValue());
+ if (StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionKey())
+ && StringUtils.isNotEmpty(targetHiveParameter.getHivePartitionValue())) {
+ hiveTargetSb.append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_KEY)
+ .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionKey())
+ .append(Constants.SPACE).append(SqoopConstants.HIVE_PARTITION_VALUE)
+ .append(Constants.SPACE).append(targetHiveParameter.getHivePartitionValue());
}
}
- }catch(Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop hive target params build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return hiveTargetSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
index 406c6f6..b8a32da 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/sqoop/generator/targets/MysqlTargetGenerator.java
@@ -14,21 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop.generator.targets;
-import org.apache.commons.lang.StringUtils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
-import org.apache.dolphinscheduler.common.utils.*;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
-import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.server.entity.SqoopTaskExecutionContext;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+import org.apache.dolphinscheduler.server.worker.task.sqoop.SqoopConstants;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.ITargetGenerator;
-import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
-import org.apache.dolphinscheduler.service.process.ProcessService;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,59 +38,74 @@ import org.slf4j.LoggerFactory;
*/
public class MysqlTargetGenerator implements ITargetGenerator {
- private Logger logger = LoggerFactory.getLogger(getClass());
+ private static final Logger logger = LoggerFactory.getLogger(MysqlTargetGenerator.class);
@Override
- public String generate(SqoopParameters sqoopParameters,TaskExecutionContext taskExecutionContext) {
+ public String generate(SqoopParameters sqoopParameters, TaskExecutionContext taskExecutionContext) {
- StringBuilder result = new StringBuilder();
- try{
+ StringBuilder mysqlTargetSb = new StringBuilder();
+ try {
TargetMysqlParameter targetMysqlParameter =
- JSONUtils.parseObject(sqoopParameters.getTargetParams(),TargetMysqlParameter.class);
+ JSONUtils.parseObject(sqoopParameters.getTargetParams(), TargetMysqlParameter.class);
SqoopTaskExecutionContext sqoopTaskExecutionContext = taskExecutionContext.getSqoopTaskExecutionContext();
- if(targetMysqlParameter != null && targetMysqlParameter.getTargetDatasource() != 0){
+ if (null != targetMysqlParameter && targetMysqlParameter.getTargetDatasource() != 0) {
// get datasource
BaseDataSource baseDataSource = DataSourceFactory.getDatasource(DbType.of(sqoopTaskExecutionContext.getTargetType()),
- sqoopTaskExecutionContext.getTargetConnectionParams());
-
- if(baseDataSource != null){
- result.append(" --connect ")
- .append(baseDataSource.getJdbcUrl())
- .append(" --username ")
- .append(baseDataSource.getUser())
- .append(" --password ")
- .append(baseDataSource.getPassword())
- .append(" --table ")
- .append(targetMysqlParameter.getTargetTable());
-
- if(StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())){
- result.append(" --columns ").append(targetMysqlParameter.getTargetColumns());
+ sqoopTaskExecutionContext.getTargetConnectionParams());
+
+ if (null != baseDataSource) {
+
+ mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.DB_CONNECT)
+ .append(Constants.SPACE).append(baseDataSource.getJdbcUrl())
+ .append(Constants.SPACE).append(SqoopConstants.DB_USERNAME)
+ .append(Constants.SPACE).append(baseDataSource.getUser())
+ .append(Constants.SPACE).append(SqoopConstants.DB_PWD)
+ .append(Constants.SPACE).append(Constants.DOUBLE_QUOTES).append(baseDataSource.getPassword()).append(Constants.DOUBLE_QUOTES)
+ .append(Constants.SPACE).append(SqoopConstants.TABLE)
+ .append(Constants.SPACE).append(targetMysqlParameter.getTargetTable());
+
+ if (StringUtils.isNotEmpty(targetMysqlParameter.getTargetColumns())) {
+ mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.COLUMNS)
+ .append(Constants.SPACE).append(targetMysqlParameter.getTargetColumns());
}
- if(StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())){
- result.append(" --fields-terminated-by '").append(targetMysqlParameter.getFieldsTerminated()).append("'");
+ if (StringUtils.isNotEmpty(targetMysqlParameter.getFieldsTerminated())) {
+ mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.FIELDS_TERMINATED_BY);
+ if (targetMysqlParameter.getFieldsTerminated().contains("'")) {
+ mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getFieldsTerminated());
+
+ } else {
+ mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getFieldsTerminated()).append(Constants.SINGLE_QUOTES);
+ }
}
- if(StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())){
- result.append(" --lines-terminated-by '").append(targetMysqlParameter.getLinesTerminated()).append("'");
+ if (StringUtils.isNotEmpty(targetMysqlParameter.getLinesTerminated())) {
+ mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.LINES_TERMINATED_BY);
+ if (targetMysqlParameter.getLinesTerminated().contains(Constants.SINGLE_QUOTES)) {
+ mysqlTargetSb.append(Constants.SPACE).append(targetMysqlParameter.getLinesTerminated());
+ } else {
+ mysqlTargetSb.append(Constants.SPACE).append(Constants.SINGLE_QUOTES).append(targetMysqlParameter.getLinesTerminated()).append(Constants.SINGLE_QUOTES);
+ }
}
- if(targetMysqlParameter.getIsUpdate()
- && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
- && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())){
- result.append(" --update-key ").append(targetMysqlParameter.getTargetUpdateKey())
- .append(" --update-mode ").append(targetMysqlParameter.getTargetUpdateMode());
+ if (targetMysqlParameter.getIsUpdate()
+ && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateKey())
+ && StringUtils.isNotEmpty(targetMysqlParameter.getTargetUpdateMode())) {
+ mysqlTargetSb.append(Constants.SPACE).append(SqoopConstants.UPDATE_KEY)
+ .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateKey())
+ .append(Constants.SPACE).append(SqoopConstants.UPDATE_MODE)
+ .append(Constants.SPACE).append(targetMysqlParameter.getTargetUpdateMode());
}
}
}
- }catch (Exception e){
- logger.error(e.getMessage());
+ } catch (Exception e) {
+ logger.error(String.format("Sqoop mysql target params build failed: [%s]", e.getMessage()));
}
- return result.toString();
+ return mysqlTargetSb.toString();
}
}
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index 2d0e39a..f187617 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task.sqoop;
import org.apache.dolphinscheduler.common.task.sqoop.SqoopParameters;
@@ -23,6 +24,9 @@ import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGenerator;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import java.util.Date;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -33,7 +37,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
-import java.util.Date;
/**
* sqoop task test
@@ -43,14 +46,12 @@ public class SqoopTaskTest {
private static final Logger logger = LoggerFactory.getLogger(SqoopTaskTest.class);
- private ProcessService processService;
- private ApplicationContext applicationContext;
private SqoopTask sqoopTask;
@Before
- public void before() throws Exception{
- processService = Mockito.mock(ProcessService.class);
- applicationContext = Mockito.mock(ApplicationContext.class);
+ public void before() {
+ ProcessService processService = Mockito.mock(ProcessService.class);
+ ApplicationContext applicationContext = Mockito.mock(ApplicationContext.class);
SpringApplicationContext springApplicationContext = new SpringApplicationContext();
springApplicationContext.setApplicationContext(applicationContext);
Mockito.when(applicationContext.getBean(ProcessService.class)).thenReturn(processService);
@@ -61,17 +62,17 @@ public class SqoopTaskTest {
taskExecutionContext.setEnvFile(".dolphinscheduler_env.sh");
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setTaskTimeout(0);
- taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1," +
- "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\"," +
- "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\"," +
- "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[]," +
- "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\"" +
- ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true," +
- "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\"," +
- "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
-
- sqoopTask = new SqoopTask(taskExecutionContext,logger);
- //test sqoop tash init method
+ taskExecutionContext.setTaskParams("{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+ + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\","
+ + "\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\","
+ + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],"
+ + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\""
+ + ",\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,"
+ + "\\\"dropDelimiter\\\":false,\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\","
+ + "\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}");
+
+ sqoopTask = new SqoopTask(taskExecutionContext, logger);
+ //test sqoop task init method
sqoopTask.init();
}
@@ -79,40 +80,72 @@ public class SqoopTaskTest {
* test SqoopJobGenerator
*/
@Test
- public void testGenerator(){
+ public void testGenerator() {
TaskExecutionContext mysqlTaskExecutionContext = getMysqlTaskExecutionContext();
//sqoop TEMPLATE job
- //import mysql to HDFS with hadoo
- String mysqlToHdfs = "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}]," +
- "\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\",\\\"delete [...]
- SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs,SqoopParameters.class);
+ //import mysql to HDFS with hadoop
+ String mysqlToHdfs =
+ "{\"jobName\":\"sqoop_import\",\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"4096\"}],"
+ + "\"sqoopAdvancedParams\":[{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],\"jobType\":\"TEMPLATE\",\"concurrency\":1,"
+ + "\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HDFS\","
+ + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"0\\\",\\\"srcQuerySql\\\":\\\"\\\",\\\"srcColumnType\\\":\\\"0\\\","
+ + "\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[]}\",\"targetParams\":\"{\\\"targetPath\\\":\\\"/ods/tmp/test/person7\\\","
+ + "\\\"deleteTargetDir\\\":true,\\\"fileType\\\":\\\"--as-textfile\\\",\\\"compressionCodec\\\":\\\"\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+ SqoopParameters mysqlToHdfsParams = JSONUtils.parseObject(mysqlToHdfs, SqoopParameters.class);
SqoopJobGenerator generator = new SqoopJobGenerator();
- String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams,mysqlTaskExecutionContext);
- String mysqlToHdfsExpected = "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile --delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
+ String mysqlToHdfsScript = generator.generateSqoopJob(mysqlToHdfsParams, mysqlTaskExecutionContext);
+ String mysqlToHdfsExpected =
+ "sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=4096 --direct -m 1 --connect jdbc:mysql://192.168.0.111:3306/test "
+ + "--username kylo --password \"123456\" --table person_2 --target-dir /ods/tmp/test/person7 --as-textfile "
+ + "--delete-target-dir --fields-terminated-by '@' --lines-terminated-by '\\n' --null-non-string 'NULL' --null-string 'NULL'";
Assert.assertEquals(mysqlToHdfsExpected, mysqlToHdfsScript);
//export hdfs to mysql using update mode
- String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\"," +
- "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\"," +
- "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\",\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
- SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql,SqoopParameters.class);
- String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams,mysqlTaskExecutionContext);
- String hdfsToMysqlScriptExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' --lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
+ String hdfsToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HDFS\","
+ + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"exportDir\\\":\\\"/ods/tmp/test/person7\\\"}\","
+ + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"id,name,age,sex,create_time\\\","
+ + "\\\"preQuery\\\":\\\"\\\",\\\"isUpdate\\\":true,\\\"targetUpdateKey\\\":\\\"id\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\","
+ + "\\\"fieldsTerminated\\\":\\\"@\\\",\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+ SqoopParameters hdfsToMysqlParams = JSONUtils.parseObject(hdfsToMysql, SqoopParameters.class);
+ String hdfsToMysqlScript = generator.generateSqoopJob(hdfsToMysqlParams, mysqlTaskExecutionContext);
+ String hdfsToMysqlScriptExpected =
+ "sqoop export -D mapred.job.name=sqoop_import -m 1 --export-dir /ods/tmp/test/person7 --connect jdbc:mysql://192.168.0.111:3306/test "
+ + "--username kylo --password \"123456\" --table person_3 --columns id,name,age,sex,create_time --fields-terminated-by '@' "
+ + "--lines-terminated-by '\\n' --update-key id --update-mode allowinsert";
Assert.assertEquals(hdfsToMysqlScriptExpected, hdfsToMysqlScript);
//export hive to mysql
- String hiveToMysql = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\",\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\",\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\ [...]
- SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql,SqoopParameters.class);
- String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams,mysqlTaskExecutionContext);
- String hiveToMysqlExpected = "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date --hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --table person_3 --fields-terminated-by '@' --lines-terminated-by '\\n'";
+ String hiveToMysql =
+ "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"export\",\"sourceType\":\"HIVE\","
+ + "\"targetType\":\"MYSQL\",\"sourceParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal\\\","
+ + "\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-17\\\"}\","
+ + "\"targetParams\":\"{\\\"targetDatasource\\\":2,\\\"targetTable\\\":\\\"person_3\\\",\\\"targetColumns\\\":\\\"\\\",\\\"preQuery\\\":\\\"\\\","
+ + "\\\"isUpdate\\\":false,\\\"targetUpdateKey\\\":\\\"\\\",\\\"targetUpdateMode\\\":\\\"allowinsert\\\",\\\"fieldsTerminated\\\":\\\"@\\\","
+ + "\\\"linesTerminated\\\":\\\"\\\\\\\\n\\\"}\",\"localParams\":[]}";
+ SqoopParameters hiveToMysqlParams = JSONUtils.parseObject(hiveToMysql, SqoopParameters.class);
+ String hiveToMysqlScript = generator.generateSqoopJob(hiveToMysqlParams, mysqlTaskExecutionContext);
+ String hiveToMysqlExpected =
+ "sqoop export -D mapred.job.name=sqoop_import -m 1 --hcatalog-database stg --hcatalog-table person_internal --hcatalog-partition-keys date "
+ + "--hcatalog-partition-values 2020-02-17 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" --table person_3 "
+ + "--fields-terminated-by '@' --lines-terminated-by '\\n'";
Assert.assertEquals(hiveToMysqlExpected, hiveToMysqlScript);
//import mysql to hive
- String mysqlToHive = "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\",\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\",\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"i [...]
- SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive,SqoopParameters.class);
- String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams,mysqlTaskExecutionContext);
- String mysqlToHiveExpected = "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password 123456 --query 'SELECT * FROM person_2 WHERE $CONDITIONS' --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 --create-hive-table --hive-overwrite -delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
+ String mysqlToHive =
+ "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
+ + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
+ + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
+ + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
+ + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
+ + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
+ SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
+ String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
+ String mysqlToHiveExpected =
+ "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect jdbc:mysql://192.168.0.111:3306/test --username kylo --password \"123456\" "
+ + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-table stg.person_internal_2 "
+ + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
//sqoop CUSTOM job
@@ -124,16 +157,18 @@ public class SqoopTaskTest {
}
-
/**
* get taskExecutionContext include mysql
+ *
* @return TaskExecutionContext
*/
private TaskExecutionContext getMysqlTaskExecutionContext() {
TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
SqoopTaskExecutionContext sqoopTaskExecutionContext = new SqoopTaskExecutionContext();
- String mysqlSourceConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
- String mysqlTargetConnectionParams = "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
+ String mysqlSourceConnectionParams =
+ "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
+ String mysqlTargetConnectionParams =
+ "{\"address\":\"jdbc:mysql://192.168.0.111:3306\",\"database\":\"test\",\"jdbcUrl\":\"jdbc:mysql://192.168.0.111:3306/test\",\"user\":\"kylo\",\"password\":\"123456\"}";
sqoopTaskExecutionContext.setDataSourceId(2);
sqoopTaskExecutionContext.setDataTargetId(2);
sqoopTaskExecutionContext.setSourcetype(0);
@@ -153,7 +188,7 @@ public class SqoopTaskTest {
* Method: init
*/
@Test
- public void testInit(){
+ public void testInit() {
try {
sqoopTask.init();
} catch (Exception e) {