You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:19:50 UTC
[33/50] [abbrv] kylin git commit: #3314 fix bugs data size in first
two steps
#3314 fix bugs data size in first two steps
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c6e45a76
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c6e45a76
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c6e45a76
Branch: refs/heads/master
Commit: c6e45a76dac1f9428308622558fe3608e11b66df
Parents: 5aa8033
Author: Sheng Zhang <sh...@kyligence.io>
Authored: Tue Dec 5 19:03:34 2017 +0800
Committer: Sheng Zhang <sh...@kyligence.io>
Committed: Wed Dec 27 16:59:51 2017 +0800
----------------------------------------------------------------------
.../source/hive/CreateFlatHiveTableStep.java | 30 +++++++++++++++++++-
.../apache/kylin/source/hive/HiveMRInput.java | 8 ++++++
2 files changed, 37 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 48e7686..891b090 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -18,7 +18,14 @@
package org.apache.kylin.source.hive;
import java.io.IOException;
+import java.util.Map;
+import java.util.regex.Pattern;
+import java.util.regex.Matcher;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HiveCmdBuilder;
import org.apache.kylin.common.util.Pair;
@@ -26,6 +33,7 @@ import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.common.PatternedLogger;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -39,6 +47,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class);
protected final PatternedLogger stepLogger = new PatternedLogger(logger);
+ private static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
protected void createFlatHiveTable(KylinConfig config) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
@@ -51,12 +60,31 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
stepLogger.log(cmd);
Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger);
- getManager().addJobInfo(getId(), stepLogger.getInfo());
+ Map<String, String> info = stepLogger.getInfo();
+
+ //get the flat Hive table size
+ Matcher matcher = HDFS_LOCATION.matcher(cmd);
+ if (matcher.find()) {
+ String hiveFlatTableHdfsUrl = matcher.group(1);
+ long size = getFileSize(hiveFlatTableHdfsUrl);
+ info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, "" + size);
+ logger.info("HDFS_Bytes_Writen: " + size);
+ }
+ getManager().addJobInfo(getId(), info);
if (response.getFirst() != 0) {
throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst());
}
}
+ private long getFileSize(String hdfsUrl) throws IOException {
+ Configuration configuration = new Configuration();
+ Path path = new Path(hdfsUrl);
+ FileSystem fs = path.getFileSystem(configuration);
+ ContentSummary contentSummary = fs.getContentSummary(path);
+ long length = contentSummary.getLength();
+ return length;
+ }
+
private KylinConfig getCubeSpecificConfig() {
String cubeName = CubingExecutableUtil.getCubeName(getParams());
CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
http://git-wip-us.apache.org/repos/asf/kylin/blob/c6e45a76/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 3671266..0b23121 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -281,6 +281,12 @@ public class HiveMRInput implements IMRInput {
return hiveClient.getHiveTableRows(database, table);
}
+ private long getDataSize(String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ long size = hiveClient.getHiveTableMeta(database, table).fileSize;
+ return size;
+ }
+
private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
hiveCmdBuilder.overwriteHiveProps(config.getHiveConfigOverride());
@@ -346,6 +352,8 @@ public class HiveMRInput implements IMRInput {
stepLogger.log("num reducers for RedistributeFlatHiveTableStep = " + numReducers);
redistributeTable(config, numReducers);
+ long dataSize = getDataSize(database, tableName);
+ getManager().addJobInfo(getId(), ExecutableConstants.HDFS_BYTES_WRITTEN, "" + dataSize);
return new ExecuteResult(ExecuteResult.State.SUCCEED, stepLogger.getBufferedLog());
} catch (Exception e) {