You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2018/05/27 00:24:45 UTC

[kylin] 03/03: KYLIN-3380 JDBC source - add a configuration file for sqoop

This is an automated email from the ASF dual-hosted git repository.

lidong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 36810892442d51a1d073817714e0b0ace9963a56
Author: lidongsjtu <li...@apache.org>
AuthorDate: Tue May 15 20:30:42 2018 +0800

    KYLIN-3380 JDBC source - add a configuration file for sqoop
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  4 ++
 .../apache/kylin/common/util/HiveCmdBuilder.java   |  3 +-
 ...ationUtil.java => SourceConfigurationUtil.java} | 46 ++++++++++++++--------
 ...lTest.java => SourceConfigurationUtilTest.java} | 14 ++++++-
 .../test_case_data/localmeta/kylin_sqoop_conf.xml  | 24 +++++++++++
 .../kylin/source/hive/BeelineHiveClient.java       |  4 +-
 .../apache/kylin/source/jdbc/JdbcHiveMRInput.java  | 25 ++++++++++--
 7 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 3b2883c..689d08f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -852,6 +852,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.source.jdbc.sqoop-mapper-num", "4"));
     }
 
+    public Map<String, String> getSqoopConfigOverride() {
+        return getPropertiesByPrefix("kylin.source.jdbc.sqoop-config-override.");
+    }
+    
     public String getJdbcSourceFieldDelimiter() {
         return getOptional("kylin.source.jdbc.field-delimiter", "|");
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
index 1823dfc..1c023aa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HiveCmdBuilder.java
@@ -31,7 +31,6 @@ import com.google.common.collect.Lists;
 public class HiveCmdBuilder {
     public static final Logger logger = LoggerFactory.getLogger(HiveCmdBuilder.class);
 
-    public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
     static final String CREATE_HQL_TMP_FILE_TEMPLATE = "cat >%s<<EOL\n%sEOL";
 
     public enum HiveClientMode {
@@ -44,7 +43,7 @@ public class HiveCmdBuilder {
 
     public HiveCmdBuilder() {
         kylinConfig = KylinConfig.getInstanceFromEnv();
-        hiveConfProps = HiveConfigurationUtil.loadHiveConfiguration();
+        hiveConfProps = SourceConfigurationUtil.loadHiveConfiguration();
     }
 
     public String build() {
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
similarity index 70%
rename from core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
rename to core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
index 1c6f985..7e61d62 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HiveConfigurationUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/SourceConfigurationUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import static org.apache.kylin.common.util.HiveCmdBuilder.HIVE_CONF_FILENAME;
-
 import java.io.File;
 import java.util.HashMap;
 import java.util.Map;
@@ -38,11 +36,14 @@ import org.w3c.dom.NodeList;
  * @author ycq
  * @since 2018-03-05
  */
-public class HiveConfigurationUtil {
+public class SourceConfigurationUtil {
 
-    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(HiveConfigurationUtil.class);
+    private static final org.slf4j.Logger logger = LoggerFactory.getLogger(SourceConfigurationUtil.class);
     private static final String HIVE_CONF_PREFIX = "hiveconf:";
 
+    public static final String HIVE_CONF_FILENAME = "kylin_hive_conf";
+    public static final String SQOOP_CONF_FILENAME = "kylin_sqoop_conf";
+
     public static Properties loadHiveJDBCProperties() {
         Map<String, String> hiveConfiguration = loadHiveConfiguration();
         Properties ret = new Properties();
@@ -53,27 +54,38 @@ public class HiveConfigurationUtil {
     }
 
     public static Map<String, String> loadHiveConfiguration() {
-        Map<String, String> hiveConfProps = new HashMap<>();
-        File hiveConfFile;
-        String hiveConfFileName = (HIVE_CONF_FILENAME + ".xml");
+        return loadXmlConfiguration(HIVE_CONF_FILENAME, true);
+    }
+
+    public static Map<String, String> loadSqoopConfiguration() {
+        return loadXmlConfiguration(SQOOP_CONF_FILENAME, false);
+    }
+
+    private static Map<String, String> loadXmlConfiguration(String filename, boolean checkExist) {
+        Map<String, String> confProps = new HashMap<>();
+        File confFile;
+        String xmlFileName = filename + ".xml";
         String path = System.getProperty(KylinConfig.KYLIN_CONF);
 
         if (StringUtils.isNotEmpty(path)) {
-            hiveConfFile = new File(path, hiveConfFileName);
+            confFile = new File(path, xmlFileName);
         } else {
             path = KylinConfig.getKylinHome();
             if (StringUtils.isEmpty(path)) {
-                logger.error("KYLIN_HOME is not set, can not locate hive conf: {}.xml", HIVE_CONF_FILENAME);
-                return hiveConfProps;
+                logger.error("KYLIN_HOME is not set, can not locate conf: {}", xmlFileName);
+                return confProps;
             }
-            hiveConfFile = new File(path + File.separator + "conf", hiveConfFileName);
+            confFile = new File(path + File.separator + "conf", xmlFileName);
         }
 
-        if (!hiveConfFile.exists()) {
-            throw new RuntimeException("Failed to read " + HIVE_CONF_FILENAME + ".xml");
+        if (!confFile.exists()) {
+            if (checkExist)
+                throw new RuntimeException("Failed to read " + xmlFileName);
+            else
+                return confProps;
         }
 
-        String fileUrl = OptionsHelper.convertToFileURL(hiveConfFile.getAbsolutePath());
+        String fileUrl = OptionsHelper.convertToFileURL(confFile.getAbsolutePath());
 
         try {
             File file = new File(fileUrl);
@@ -82,19 +94,19 @@ public class HiveConfigurationUtil {
                 DocumentBuilder builder = factory.newDocumentBuilder();
                 Document doc = builder.parse(file);
                 NodeList nl = doc.getElementsByTagName("property");
-                hiveConfProps.clear();
+                confProps.clear();
                 for (int i = 0; i < nl.getLength(); i++) {
                     String key = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
                     String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
                     if (!key.equals("tmpjars")) {
-                        hiveConfProps.put(key, value);
+                        confProps.put(key, value);
                     }
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException("Failed to parse hive conf file ", e);
         }
-        return hiveConfProps;
+        return confProps;
     }
 
 }
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
similarity index 73%
rename from core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
rename to core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
index d4019a9..9e1a65f 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/HiveConfigurationUtilTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/SourceConfigurationUtilTest.java
@@ -18,14 +18,17 @@
 
 package org.apache.kylin.common.util;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Map;
 import java.util.Properties;
 
 import org.junit.Before;
 import org.junit.Test;
 
-public class HiveConfigurationUtilTest {
+public class SourceConfigurationUtilTest {
     @Before
     public void setup() {
         System.setProperty("log4j.configuration", "file:../build/conf/kylin-tools-log4j.properties");
@@ -34,7 +37,14 @@ public class HiveConfigurationUtilTest {
 
     @Test
     public void testHiveConf() {
-        Properties properties = HiveConfigurationUtil.loadHiveJDBCProperties();
+        Properties properties = SourceConfigurationUtil.loadHiveJDBCProperties();
         assertTrue(properties.containsKey("hiveconf:hive.auto.convert.join.noconditionaltask.size"));
     }
+
+    @Test
+    public void testSqoopConf() {
+        Map<String, String> configMap = SourceConfigurationUtil.loadSqoopConfiguration();
+        assertFalse(configMap.isEmpty());
+        assertEquals("1", configMap.get("dfs.replication"));
+    }
 }
diff --git a/examples/test_case_data/localmeta/kylin_sqoop_conf.xml b/examples/test_case_data/localmeta/kylin_sqoop_conf.xml
new file mode 100644
index 0000000..de31211
--- /dev/null
+++ b/examples/test_case_data/localmeta/kylin_sqoop_conf.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+    <property>
+        <name>dfs.replication</name>
+        <value>1</value>
+        <description>Used in UT</description>
+    </property>
+</configuration>
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
index 52b752c..b1021dc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/BeelineHiveClient.java
@@ -30,7 +30,7 @@ import java.util.Properties;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DBUtils;
-import org.apache.kylin.common.util.HiveConfigurationUtil;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -60,7 +60,7 @@ public class BeelineHiveClient implements IHiveClient {
                 password = stripQuotes(splits[i + 1]);
             }
         }
-        Properties jdbcProperties = HiveConfigurationUtil.loadHiveJDBCProperties();
+        Properties jdbcProperties = SourceConfigurationUtil.loadHiveJDBCProperties();
         jdbcProperties.put(HIVE_AUTH_PASSWD, password);
         jdbcProperties.put(HIVE_AUTH_USER, username);
         this.init(url, jdbcProperties);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index bbaaa4a..fcdb516 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -39,6 +40,8 @@ import org.apache.kylin.source.hive.HiveMRInput;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 public class JdbcHiveMRInput extends HiveMRInput {
 
     private static final Logger logger = LoggerFactory.getLogger(JdbcHiveMRInput.class);
@@ -184,12 +187,12 @@ public class JdbcHiveMRInput extends HiveMRInput {
             //related to "kylin.engine.mr.config-override.mapreduce.job.queuename"
             String queueName = getSqoopJobQueueName(config);
             String cmd = String.format("%s/sqoop import -Dorg.apache.sqoop.splitter.allow_text_splitter=true "
-                    + "-Dmapreduce.job.queuename=%s "
+                    + generateSqoopConfigArgString()
                     + "--connect \"%s\" --driver %s --username %s --password %s --query \"%s AND \\$CONDITIONS\" "
                     + "--target-dir %s/%s --split-by %s.%s --boundary-query \"%s\" --null-string '' "
-                    + "--fields-terminated-by '%s' --num-mappers %d", sqoopHome, queueName, connectionUrl, driverClass,
-                    jdbcUser, jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, splitColumn, bquery,
-                    filedDelimiter, mapperNum);
+                    + "--fields-terminated-by '%s' --num-mappers %d", sqoopHome, connectionUrl, driverClass, jdbcUser,
+                    jdbcPass, selectSql, jobWorkingDir, hiveTable, splitTable, splitColumn, bquery, filedDelimiter,
+                    mapperNum);
             logger.debug(String.format("sqoop cmd:%s", cmd));
             CmdStep step = new CmdStep();
             step.setCmd(cmd);
@@ -201,5 +204,19 @@ public class JdbcHiveMRInput extends HiveMRInput {
         protected void addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
             // skip
         }
+
+        protected String generateSqoopConfigArgString() {
+            KylinConfig kylinConfig = getConfig();
+            Map<String, String> config = Maps.newHashMap();
+            config.put("mapreduce.job.queuename", getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
+            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
+            config.putAll(kylinConfig.getSqoopConfigOverride());
+
+            StringBuilder args = new StringBuilder();
+            for (Map.Entry<String, String> entry : config.entrySet()) {
+                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + " ");
+            }
+            return args.toString();
+        }
     }
 }

-- 
To stop receiving notification emails like this one, please contact
lidong@apache.org.