You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ka...@apache.org on 2017/04/13 11:21:57 UTC

[31/50] [abbrv] kylin git commit: KYLIN-2165 fix IT

KYLIN-2165 fix IT


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2e87fb41
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2e87fb41
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2e87fb41

Branch: refs/heads/KYLIN-2506
Commit: 2e87fb41fe36aff015950bad3a88e7ac525debd6
Parents: 93cc5ab
Author: gaodayue <ga...@meituan.com>
Authored: Fri Apr 7 17:01:24 2017 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Fri Apr 7 19:10:38 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/JoinedFlatTable.java   | 32 +++----
 .../kylin/job/engine/JobEngineConfig.java       |  2 +-
 .../apache/kylin/job/JoinedFlatTableTest.java   |  4 +-
 .../apache/kylin/source/hive/HiveMRInput.java   | 95 ++++++++------------
 4 files changed, 58 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9ed563f..5553d34 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -20,12 +20,12 @@ package org.apache.kylin.job;
 
 import java.io.File;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
 
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
@@ -46,30 +46,35 @@ public class JoinedFlatTable {
         return storageDfsDir + "/" + flatDesc.getTableName();
     }
 
-    public static String generateHiveSetStatements(JobEngineConfig engineConfig) {
+    public static String generateHiveInitStatements(
+            String flatTableDatabase, String kylinHiveFile, Map<String, String> cubeOverrides) {
+
         StringBuilder buffer = new StringBuilder();
 
+        buffer.append("USE ").append(flatTableDatabase).append(";\n");
         try {
-            File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
-
-            if (hadoopPropertiesFile.exists()) {
+            File file = new File(kylinHiveFile);
+            if (file.exists()) {
                 DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-                DocumentBuilder builder;
-                Document doc;
-                builder = factory.newDocumentBuilder();
-                doc = builder.parse(hadoopPropertiesFile);
+                DocumentBuilder builder = factory.newDocumentBuilder();
+                Document doc = builder.parse(file);
                 NodeList nl = doc.getElementsByTagName("property");
                 for (int i = 0; i < nl.getLength(); i++) {
                     String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
                     String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
                     if (!name.equals("tmpjars")) {
-                        buffer.append("SET " + name + "=" + value + ";\n");
+                        buffer.append("SET ").append(name).append("=").append(value).append(";\n");
                     }
                 }
             }
         } catch (Exception e) {
             throw new RuntimeException("Failed to parse hive conf file ", e);
         }
+
+        for (Map.Entry<String, String> entry : cubeOverrides.entrySet()) {
+            buffer.append("SET ").append(entry.getKey()).append("=").append(entry.getValue()).append(";\n");
+        }
+
         return buffer.toString();
     }
 
@@ -98,11 +103,8 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig) {
-        StringBuilder sql = new StringBuilder();
-        sql.append(generateHiveSetStatements(engineConfig));
-        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";").append("\n");
-        return sql.toString();
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc) {
+        return "INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc) + ";\n";
     }
 
     public static String generateSelectDataStatement(IJoinedFlatTableDesc flatDesc) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 0f5b7dd..8859527 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -95,7 +95,7 @@ public class JobEngineConfig {
         return path;
     }
 
-    public String getHiveConfFilePath() throws IOException {
+    public String getHiveConfFilePath() {
         String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
 
         File jobConfig = getJobConfig(hiveConfFile);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
----------------------------------------------------------------------
diff --git a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
index 0faf22a..65169c9 100644
--- a/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
+++ b/core-job/src/test/java/org/apache/kylin/job/JoinedFlatTableTest.java
@@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.junit.After;
 import org.junit.Before;
@@ -77,7 +75,7 @@ public class JoinedFlatTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void testGenerateInsertSql() throws IOException {
-        String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
+        String sqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc);
         System.out.println(sqls);
 
         int length = sqls.length();

http://git-wip-us.apache.org/repos/asf/kylin/blob/2e87fb41/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 2f966ab..418fcfc 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
-import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
@@ -127,51 +126,58 @@ public class HiveMRInput implements IMRInput {
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
-        JobEngineConfig conf;
         final IJoinedFlatTableDesc flatDesc;
+        final String flatTableDatabase;
+        final String hdfsWorkingDir;
+
         String hiveViewIntermediateTables = "";
 
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
             this.flatDesc = flatDesc;
+            this.flatTableDatabase = config.getHiveDatabaseForIntermediateTable();
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
         }
 
         @Override
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final KylinConfig kylinConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
-            this.conf = new JobEngineConfig(kylinConfig);
+            final KylinConfig cubeConfig = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName).getConfig();
+            JobEngineConfig conf = new JobEngineConfig(cubeConfig);
+
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(
+                    flatTableDatabase, conf.getHiveConfFilePath(), cubeConfig.getHiveConfigOverride()
+            ) ;
+            final String jobWorkingDir = getJobWorkingDir(jobFlow);
 
             // create flat table first, then count and redistribute
-            jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
-            if (kylinConfig.isHiveRedistributeEnabled() == true) {
-                jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, jobWorkingDir, cubeName));
+            if (cubeConfig.isHiveRedistributeEnabled() == true) {
+                jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, cubeName));
             }
-            AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
+            AbstractExecutable task = createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir);
             if (task != null) {
                 jobFlow.addTask(task);
             }
         }
 
-        public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
-            StringBuilder hiveInitBuf = new StringBuilder();
-            hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
-            hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-            final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
-            appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
+        private String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return JobBuilderSupport.getJobWorkingDir(hdfsWorkingDir, jobFlow.getId());
+        }
 
+        private AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName) {
             RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
-            step.setInitStatement(hiveInitBuf.toString());
-            step.setIntermediateTable(flatTableDesc.getTableName());
-            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc));
+            step.setInitStatement(hiveInitStatements);
+            step.setIntermediateTable(flatDesc.getTableName());
+            step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatDesc));
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_REDISTRIBUTE_FLAT_HIVE_TABLE);
             return step;
         }
 
-        public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
+        private ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir) {
             ShellExecutable step = new ShellExecutable();
             step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
-            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
 
             KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
             MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
@@ -187,16 +193,15 @@ public class HiveMRInput implements IMRInput {
             if (lookupViewsTables.size() == 0) {
                 return null;
             }
-            appendHiveOverrideProperties2(kylinConfig, hiveCmdBuilder);
-            final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";";
-            hiveCmdBuilder.addStatement(useDatabaseHql);
-            hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
+
+            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            hiveCmdBuilder.addStatement(hiveInitStatements);
             for (TableDesc lookUpTableDesc : lookupViewsTables) {
                 if (lookUpTableDesc.isView()) {
                     StringBuilder createIntermediateTableHql = new StringBuilder();
                     createIntermediateTableHql.append("DROP TABLE IF EXISTS " + lookUpTableDesc.getMaterializedName() + ";\n");
                     createIntermediateTableHql.append("CREATE TABLE IF NOT EXISTS " + lookUpTableDesc.getMaterializedName() + "\n");
-                    createIntermediateTableHql.append("LOCATION '" + JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/" + lookUpTableDesc.getMaterializedName() + "'\n");
+                    createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + lookUpTableDesc.getMaterializedName() + "'\n");
                     createIntermediateTableHql.append("AS SELECT * FROM " + lookUpTableDesc.getIdentity() + ";\n");
                     hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
                     hiveViewIntermediateTables = hiveViewIntermediateTables + lookUpTableDesc.getMaterializedName() + ";";
@@ -209,19 +214,14 @@ public class HiveMRInput implements IMRInput {
             return step;
         }
 
-        public static AbstractExecutable createFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
-            StringBuilder hiveInitBuf = new StringBuilder();
-            hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
-            final KylinConfig kylinConfig = ((CubeSegment) flatTableDesc.getSegment()).getConfig();
-            appendHiveOverrideProperties(kylinConfig, hiveInitBuf);
-            final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
-            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
-            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
-            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf);
+        private AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, String cubeName) {
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
+            String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatDesc);
 
             CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
-            step.setInitStatement(hiveInitBuf.toString());
-            step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql + insertDataHqls);
+            step.setInitStatement(hiveInitStatements);
+            step.setCreateTableStatement(dropTableHql + createTableHql + insertDataHqls);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
             return step;
@@ -229,10 +229,12 @@ public class HiveMRInput implements IMRInput {
 
         @Override
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            final String jobWorkingDir = getJobWorkingDir(jobFlow);
+
             GarbageCollectionStep step = new GarbageCollectionStep();
             step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
             step.setIntermediateTableIdentity(getIntermediateTableIdentity());
-            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, jobWorkingDir));
             step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
             jobFlow.addTask(step);
         }
@@ -243,7 +245,7 @@ public class HiveMRInput implements IMRInput {
         }
 
         private String getIntermediateTableIdentity() {
-            return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName();
+            return flatTableDatabase + "." + flatDesc.getTableName();
         }
     }
 
@@ -413,24 +415,5 @@ public class HiveMRInput implements IMRInput {
         public void setHiveViewIntermediateTableIdentities(String tableIdentities) {
             setParam("oldHiveViewIntermediateTables", tableIdentities);
         }
-
-    }
-
-    private static void appendHiveOverrideProperties(final KylinConfig kylinConfig, StringBuilder hiveCmd) {
-        final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
-        if (hiveConfOverride.isEmpty() == false) {
-            for (String key : hiveConfOverride.keySet()) {
-                hiveCmd.append("SET ").append(key).append("=").append(hiveConfOverride.get(key)).append(";\n");
-            }
-        }
-    }
-
-    private static void appendHiveOverrideProperties2(final KylinConfig kylinConfig, HiveCmdBuilder hiveCmdBuilder) {
-        final Map<String, String> hiveConfOverride = kylinConfig.getHiveConfigOverride();
-        if (hiveConfOverride.isEmpty() == false) {
-            for (String key : hiveConfOverride.keySet()) {
-                hiveCmdBuilder.addStatement("SET " + key + "=" + hiveConfOverride.get(key) + ";\n");
-            }
-        }
     }
 }