You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/06 09:31:20 UTC

[44/49] kylin git commit: KYLIN-1855 revert to original, all joins appear in flat table

KYLIN-1855 revert to original, all joins appear in flat table


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4bc3490b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4bc3490b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4bc3490b

Branch: refs/heads/KYLIN-1971
Commit: 4bc3490b409856a4c2ae04e2dd80bc26b5943461
Parents: 37a98b7
Author: Li Yang <li...@apache.org>
Authored: Fri Nov 4 16:00:23 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Fri Nov 4 16:26:25 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/JoinedFlatTable.java   | 34 +++++++-------------
 .../apache/kylin/source/hive/HiveMRInput.java   | 27 +---------------
 2 files changed, 13 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4bc3490b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 9c73a53..e43bca3 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -39,7 +39,6 @@ import org.w3c.dom.Document;
 import org.w3c.dom.NodeList;
 
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
 
 /**
  *
@@ -47,8 +46,8 @@ import com.google.common.collect.Sets;
 
 public class JoinedFlatTable {
 
-    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
-        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
+    public static String getTableDir(IJoinedFlatTableDesc flatDesc, String storageDfsDir) {
+        return storageDfsDir + "/" + flatDesc.getTableName();
     }
 
     public static String generateHiveSetStatements(JobEngineConfig engineConfig) {
@@ -99,16 +98,16 @@ public class JoinedFlatTable {
         return ddl.toString();
     }
 
-    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
+    public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
-        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
+        ddl.append("DROP TABLE IF EXISTS " + flatDesc.getTableName() + ";").append("\n");
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig, boolean redistribute) {
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, JobEngineConfig engineConfig, boolean redistribute) {
         StringBuilder sql = new StringBuilder();
         sql.append(generateHiveSetStatements(engineConfig));
-        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc, redistribute) + ";").append("\n");
+        sql.append("INSERT OVERWRITE TABLE " + flatDesc.getTableName() + " " + generateSelectDataStatement(flatDesc, redistribute) + ";").append("\n");
         return sql.toString();
     }
 
@@ -218,19 +217,10 @@ public class JoinedFlatTable {
     }
 
     private static List<JoinDesc> getUsedJoinsSet(IJoinedFlatTableDesc flatDesc) {
-        Set<String> usedTableIdentities = Sets.newHashSet();
-        for (TblColRef col : flatDesc.getAllColumns()) {
-            usedTableIdentities.add(col.getTable());
-        }
-        
         List<JoinDesc> result = Lists.newArrayList();
         for (LookupDesc lookup : flatDesc.getDataModel().getLookups()) {
-            String table = lookup.getTableRef().getTableIdentity();
-            if (usedTableIdentities.contains(table)) {
-                result.add(lookup.getJoin());
-            }
+            result.add(lookup.getJoin());
         }
-        
         return result;
     }
 
@@ -283,20 +273,20 @@ public class JoinedFlatTable {
         return hiveDataType.toLowerCase();
     }
 
-    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) {
+    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc flatDesc, String outputDir) {
         StringBuilder sql = new StringBuilder();
         sql.append("set hive.exec.compress.output=false;\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
+        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + flatDesc.getTableName() + ";\n");
         return sql.toString();
     }
 
-    public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
-        final String tableName = intermediateTableDesc.getTableName();
+    public static String generateRedistributeFlatTableStatement(IJoinedFlatTableDesc flatDesc) {
+        final String tableName = flatDesc.getTableName();
         StringBuilder sql = new StringBuilder();
         sql.append("INSERT OVERWRITE TABLE " + tableName + " SELECT * FROM " + tableName);
 
         String redistributeCol = null;
-        TblColRef distDcol = intermediateTableDesc.getDistributedBy();
+        TblColRef distDcol = flatDesc.getDistributedBy();
         if (distDcol != null) {
             redistributeCol = colName(distDcol.getCanonicalName());
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4bc3490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index f536cbb..67f091b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -293,7 +293,7 @@ public class HiveMRInput implements IMRInput {
             FileSystem fs = FileSystem.get(file.toUri(), HadoopUtil.getCurrentConfiguration());
             InputStream in = fs.open(file);
             try {
-                String content = IOUtils.toString(in, Charset.defaultCharset());
+                String content = IOUtils.toString(in, "UTF-8");
                 return Long.valueOf(content.trim()); // strip the '\n' character
 
             } finally {
@@ -428,14 +428,6 @@ public class HiveMRInput implements IMRInput {
             return output.toString();
         }
 
-        private void mkdirOnHDFS(String path) throws IOException {
-            Path externalDataPath = new Path(path);
-            FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
-            if (!fs.exists(externalDataPath)) {
-                fs.mkdirs(externalDataPath);
-            }
-        }
-
         private void rmdirOnHDFS(String path) throws IOException {
             Path externalDataPath = new Path(path);
             FileSystem fs = FileSystem.get(externalDataPath.toUri(), HadoopUtil.getCurrentConfiguration());
@@ -444,20 +436,6 @@ public class HiveMRInput implements IMRInput {
             }
         }
 
-        private String cleanUpHiveViewIntermediateTable(KylinConfig config) throws IOException {
-            StringBuffer output = new StringBuffer();
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-            hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
-            if (getHiveViewIntermediateTableIdentities() != null && !getHiveViewIntermediateTableIdentities().isEmpty()) {
-                for (String hiveTableName : getHiveViewIntermediateTableIdentities().split(";")) {
-                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTableName + ";");
-                }
-            }
-            config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
-            output.append("hive view intermediate tables: " + getHiveViewIntermediateTableIdentities() + " is dropped. \n");
-            return output.toString();
-        }
-
         public void setIntermediateTableIdentity(String tableIdentity) {
             setParam("oldHiveTable", tableIdentity);
         }
@@ -478,9 +456,6 @@ public class HiveMRInput implements IMRInput {
             setParam("oldHiveViewIntermediateTables", tableIdentities);
         }
 
-        private String getHiveViewIntermediateTableIdentities() {
-            return getParam("oldHiveViewIntermediateTables");
-        }
     }