You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:19:50 UTC

[33/50] [abbrv] kylin git commit: #3314 fix bugs data size in first two steps

#3314  fix bugs data size in first two steps


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6e45a76
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6e45a76
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6e45a76

Branch: refs/heads/master
Commit: c6e45a76dac1f9428308622558fe3608e11b66df
Parents: 5aa8033
Author: Sheng Zhang <sh...@kyligence.io>
Authored: Tue Dec 5 19:03:34 2017 +0800
Committer: Sheng Zhang <sh...@kyligence.io>
Committed: Wed Dec 27 16:59:51 2017 +0800

----------------------------------------------------------------------
 .../source/hive/CreateFlatHiveTableStep.java    | 30 +++++++++++++++++++-
 .../apache/kylin/source/hive/HiveMRInput.java   |  8 ++++++
 2 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 48e7686..891b090 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -18,7 +18,14 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HiveCmdBuilder;
 import org.apache.kylin.common.util.Pair;
@@ -26,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -39,6 +47,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
 
     private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class);
     protected final PatternedLogger stepLogger = new PatternedLogger(logger);
+    private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
 
     protected void createFlatHiveTable(KylinConfig config) throws IOException {
         final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -51,12 +60,31 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
         stepLogger.log(cmd);
 
         Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
-        getManager().addJobInfo(getId(), stepLogger.getInfo());
+        Map<String, String> info = stepLogger.getInfo();
+
+        //get the flat Hive table size
+        Matcher matcher = HDFS_LOCATION.matcher(cmd);
+        if (matcher.find()) {
+            String hiveFlatTableHdfsUrl = matcher.group(1);
+            long size = getFileSize(hiveFlatTableHdfsUrl);
+            info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+            logger.info("HDFS_Bytes_Writen: " + size);
+        }
+        getManager().addJobInfo(getId(), info);
         if (response.getFirst() != 0) {
             throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst());
         }
     }
 
+    private long getFileSize(String hdfsUrl) throws IOException {
+        Configuration configuration = new Configuration();
+        Path path = new Path(hdfsUrl);
+        FileSystem fs = path.getFileSystem(configuration);
+        ContentSummary contentSummary = fs.getContentSummary(path);
+        long length = contentSummary.getLength();
+        return length;
+    }
+
     private KylinConfig getCubeSpecificConfig() {
         String cubeName = CubingExecutableUtil.getCubeName(getParams());
         CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());

http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3671266..0b23121 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -281,6 +281,12 @@ public class HiveMRInput implements IMRInput {
             return hiveClient.getHiveTableRows(database, table);
         }
 
+        private long getDataSize(String database, String table) throws Exception {
+            IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+            long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+            return size;
+        }
+
         private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
             final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
             hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
@@ -346,6 +352,8 @@ public class HiveMRInput implements IMRInput {
                 stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
 
                 redistributeTable(config, numReducers);
+                long dataSize = getDataSize(database, tableName);
+                getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
                 return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
 
             } catch (Exception e) {