You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 08:23:43 UTC
kylin git commit: KYLIN-1826 support mutiple hive env by extending
KylinConfig and HiveClient
Repository: kylin
Updated Branches:
refs/heads/KYLIN-1826-2 [created] 3ac6490b3
KYLIN-1826 support mutiple hive env by extending KylinConfig and HiveClient
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3ac6490b
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3ac6490b
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3ac6490b
Branch: refs/heads/KYLIN-1826-2
Commit: 3ac6490b36e897c6ad853f0d1eba3f9c97c88382
Parents: 309593b
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 9 16:23:19 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 16:23:19 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 8 ++
.../kylin/metadata/project/ProjectInstance.java | 23 ++++-
.../apache/kylin/source/hive/CLIHiveClient.java | 35 +++++++
.../source/hive/CreateFlatHiveTableStep.java | 2 +-
.../kylin/source/hive/HiveClientFactory.java | 14 ++-
.../kylin/source/hive/HiveCmdBuilder.java | 17 +++-
.../apache/kylin/source/hive/HiveMRInput.java | 47 ++++++++--
.../source/hive/HiveSourceTableLoader.java | 2 +-
.../apache/kylin/source/hive/HiveTableMeta.java | 96 ++++++++++++++++++++
9 files changed, 227 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index ee9f57c..9b3a592 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -830,4 +830,12 @@ abstract public class KylinConfigBase implements Serializable {
public void setMaxBuildingSegments(int maxBuildingSegments) {
setProperty("kylin.cube.building.segment.max", String.valueOf(maxBuildingSegments));
}
+
+ public String getHiveHome() {
+ return getOptional("kylin.hive.home", "");
+ }
+
+ public void setHiveHome(String hiveHome) {
+ setProperty("kylin.hive.home", hiveHome);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 1afc603..f9d220f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -19,6 +19,7 @@
package org.apache.kylin.metadata.project;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -26,6 +27,8 @@ import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.metadata.realization.RealizationType;
@@ -78,6 +81,11 @@ public class ProjectInstance extends RootPersistentEntity {
@JsonProperty("ext_filters")
private Set<String> extFilters = new TreeSet<String>();
+ @JsonProperty("override_kylin_properties")
+ private LinkedHashMap<String, String> overrideKylinProps = new LinkedHashMap<>();
+
+ private KylinConfigExt config;
+
public String getResourcePath() {
return concatResourcePath(name);
}
@@ -285,19 +293,30 @@ public class ProjectInstance extends RootPersistentEntity {
}
}
+ public KylinConfig getConfig() {
+ return config;
+ }
+
+ private void setConfig(KylinConfigExt config) {
+ this.config = config;
+ }
+
+
public void init() {
if (name == null)
name = ProjectInstance.DEFAULT_PROJECT_NAME;
if (realizationEntries == null) {
- realizationEntries = new ArrayList<RealizationEntry>();
+ realizationEntries = new ArrayList<>();
}
if (tables == null)
- tables = new TreeSet<String>();
+ tables = new TreeSet<>();
if (StringUtils.isBlank(this.name))
throw new IllegalStateException("Project name must not be blank");
+
+ this.config = KylinConfigExt.createInstance(config, overrideKylinProps);
}
@Override
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
index 5a17f1f..7538444 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CLIHiveClient.java
@@ -19,9 +19,12 @@
package org.apache.kylin.source.hive;
import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
import java.util.List;
import java.util.Map;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -33,6 +36,7 @@ import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.kylin.common.KylinConfig;
import com.google.common.collect.Lists;
@@ -46,11 +50,42 @@ public class CLIHiveClient implements IHiveClient {
protected HiveConf hiveConf = null;
protected Driver driver = null;
protected HiveMetaStoreClient metaStoreClient = null;
+ private final static String LOCAL_FS_SCHEMA = "file://";
+ public static final String HIVE_CONFIG_FILE_LOCATION = "conf/hive-site.xml";
+ public static final String HIVE_COMMAND_LOCATION = "bin/hive";
public CLIHiveClient() {
hiveConf = new HiveConf(CLIHiveClient.class);
}
+ public CLIHiveClient(KylinConfig kylinConfig) {
+
+ String hiveHome = kylinConfig.getHiveHome();
+ if (StringUtils.isNotEmpty(hiveHome)) {
+ if (hiveHome.endsWith("/") == false) {
+ hiveHome = hiveHome + "/";
+ }
+ String configFileLocation = hiveHome + HIVE_CONFIG_FILE_LOCATION;
+ URL uri = null;
+ try {
+ uri = new URL(LOCAL_FS_SCHEMA + configFileLocation);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Can not find hive config file " + configFileLocation);
+ }
+ /**
+ * In HiveConf, hiveSiteURL is a static variable, so we should use a global lock.
+ * If uri is null, HiveConf will use the file from java classpath.
+ */
+ synchronized (CLIHiveClient.class) {
+ hiveConf.setHiveSiteLocation(uri);
+ hiveConf = new HiveConf(CLIHiveClient.class);
+ }
+
+ } else {
+ hiveConf = new HiveConf(CLIHiveClient.class);
+ }
+ }
+
/**
* only used by Deploy Util
*/
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index 025fd94..6074d1b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -40,7 +40,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
private final BufferedLogger stepLogger = new BufferedLogger(logger);
private void createFlatHiveTable(KylinConfig config) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config);
hiveCmdBuilder.addStatement(getInitStatement());
hiveCmdBuilder.addStatement(getCreateTableStatement());
final String cmd = hiveCmdBuilder.toString();
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
index 8c883af..4f8755a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClientFactory.java
@@ -21,13 +21,19 @@ package org.apache.kylin.source.hive;
import org.apache.kylin.common.KylinConfig;
public class HiveClientFactory {
- public static IHiveClient getHiveClient() {
- if ("cli".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
- return new CLIHiveClient();
+
+ public static IHiveClient getHiveClientByConfig(KylinConfig kylinConfig) {
+ if ("cli".equals(kylinConfig.getHiveClientMode())) {
+ return new CLIHiveClient(kylinConfig);
} else if ("beeline".equals(KylinConfig.getInstanceFromEnv().getHiveClientMode())) {
- return new BeelineHiveClient(KylinConfig.getInstanceFromEnv().getHiveBeelineParams());
+ return new BeelineHiveClient(kylinConfig.getHiveBeelineParams());
} else {
throw new RuntimeException("cannot recognize hive client mode");
}
}
+
+ public static IHiveClient getHiveClient() {
+ return getHiveClientByConfig(KylinConfig.getInstanceFromEnv());
+
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
index 844cf12..9707ac1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,11 @@ public class HiveCmdBuilder {
final private ArrayList<String> statements = Lists.newArrayList();
public HiveCmdBuilder() {
- kylinConfig = KylinConfig.getInstanceFromEnv();
+ this(KylinConfig.getInstanceFromEnv());
+ }
+
+ public HiveCmdBuilder(KylinConfig kylinConfig) {
+ this.kylinConfig = kylinConfig;
clientMode = HiveClientMode.valueOf(kylinConfig.getHiveClientMode().toUpperCase());
}
@@ -53,7 +58,15 @@ public class HiveCmdBuilder {
switch (clientMode) {
case CLI:
- buf.append("hive -e \"");
+ String hiveHome = kylinConfig.getHiveHome();
+ if (StringUtils.isNotEmpty(hiveHome)) {
+ if (hiveHome.endsWith("/") == false) {
+ hiveHome = hiveHome + "/";
+ }
+ buf.append(hiveHome).append(CLIHiveClient.HIVE_COMMAND_LOCATION).append(" -e \"");
+ } else {
+ buf.append("hive -e \"");
+ }
for (String statement : statements) {
buf.append(statement).append("\n");
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 9e9dc25..1cee29c 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -59,6 +59,8 @@ import com.google.common.collect.Sets;
public class HiveMRInput implements IMRInput {
+ private static final Logger logger = LoggerFactory.getLogger(HiveMRInput.class);
+
@Override
public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
return new BatchCubingInputSide(flatDesc);
@@ -133,10 +135,41 @@ public class HiveMRInput implements IMRInput {
if (kylinConfig.isHiveRedistributeEnabled() == true) {
jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
}
+
AbstractExecutable task = createLookupHiveViewMaterializationStep(jobFlow.getId());
if (task != null) {
jobFlow.addTask(task);
}
+
+ if (StringUtils.isNotEmpty(kylinConfig.getHiveHome()) == true) {
+ // copy from another hive cluster to current cluster
+ task = createCopyHiveDataStep(kylinConfig, flatDesc, jobFlow.getId());
+ if (task != null) {
+ jobFlow.addTask(task);
+ }
+ }
+ }
+
+ protected AbstractExecutable createCopyHiveDataStep(KylinConfig kylinConfig, IJoinedFlatTableDesc flatDesc, String jobId) {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(kylinConfig);
+ String input = "";
+ try {
+ input = hiveClient.getHiveTableMeta(kylinConfig.getHiveDatabaseForIntermediateTable(), flatDesc.getTableName()).getSdLocation();
+ } catch (Exception e) {
+ logger.error("Error when get intermediate table location", e);
+ throw new IllegalArgumentException(e);
+ }
+
+ if(input.startsWith("/") || input.startsWith(HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY))) {
+ // in the same cluster
+ return null;
+ }
+ String output = JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+ String cmd = String.format("hadoop distcp -overwrite %s %s", input, output);
+ ShellExecutable task = new ShellExecutable();
+ task.setName("Copy Intermediate Table To Local DFS");
+ task.setCmd(cmd);
+ return task;
}
public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
@@ -158,9 +191,9 @@ public class HiveMRInput implements IMRInput {
public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
-
KylinConfig kylinConfig = ((CubeSegment) flatDesc.getSegment()).getConfig();
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(kylinConfig);
+
MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
final Set<TableDesc> lookupViewsTables = Sets.newHashSet();
@@ -237,13 +270,13 @@ public class HiveMRInput implements IMRInput {
public static class RedistributeFlatHiveTableStep extends AbstractExecutable {
private final BufferedLogger stepLogger = new BufferedLogger(logger);
- private long computeRowCount(String database, String table) throws Exception {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ private long computeRowCount(KylinConfig config, String database, String table) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(config);
return hiveClient.getHiveTableRows(database, table);
}
private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config);
hiveCmdBuilder.addStatement(getInitStatement());
hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
@@ -280,7 +313,7 @@ public class HiveMRInput implements IMRInput {
}
try {
- long rowCount = computeRowCount(database, tableName);
+ long rowCount = computeRowCount(config, database, tableName);
logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount);
if (rowCount == 0) {
if (!config.isEmptySegmentAllowed()) {
@@ -358,7 +391,7 @@ public class HiveMRInput implements IMRInput {
StringBuffer output = new StringBuffer();
final String hiveTable = this.getIntermediateTableIdentity();
if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(config);
hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 401e720..c5c7806 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -60,7 +60,7 @@ public class HiveSourceTableLoader {
db2tables.put(parts[0], parts[1]);
}
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ IHiveClient hiveClient = HiveClientFactory.getHiveClientByConfig(config);
SchemaChecker checker = new SchemaChecker(hiveClient, MetadataManager.getInstance(config), CubeManager.getInstance(config));
for (Map.Entry<String, String> entry : db2tables.entries()) {
SchemaChecker.CheckResult result = checker.allowReload(entry.getKey(), entry.getValue());
http://git-wip-us.apache.org/repos/asf/kylin/blob/3ac6490b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
index 784a0bb..dcb79e1 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableMeta.java
@@ -66,6 +66,102 @@ class HiveTableMeta {
this.partitionColumns = partitionColumns;
}
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getSdLocation() {
+ return sdLocation;
+ }
+
+ public void setSdLocation(String sdLocation) {
+ this.sdLocation = sdLocation;
+ }
+
+ public String getSdInputFormat() {
+ return sdInputFormat;
+ }
+
+ public void setSdInputFormat(String sdInputFormat) {
+ this.sdInputFormat = sdInputFormat;
+ }
+
+ public String getSdOutputFormat() {
+ return sdOutputFormat;
+ }
+
+ public void setSdOutputFormat(String sdOutputFormat) {
+ this.sdOutputFormat = sdOutputFormat;
+ }
+
+ public String getOwner() {
+ return owner;
+ }
+
+ public void setOwner(String owner) {
+ this.owner = owner;
+ }
+
+ public String getTableType() {
+ return tableType;
+ }
+
+ public void setTableType(String tableType) {
+ this.tableType = tableType;
+ }
+
+ public int getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ public void setLastAccessTime(int lastAccessTime) {
+ this.lastAccessTime = lastAccessTime;
+ }
+
+ public long getFileSize() {
+ return fileSize;
+ }
+
+ public void setFileSize(long fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public long getFileNum() {
+ return fileNum;
+ }
+
+ public void setFileNum(long fileNum) {
+ this.fileNum = fileNum;
+ }
+
+ public boolean isNative() {
+ return isNative;
+ }
+
+ public void setNative(boolean aNative) {
+ isNative = aNative;
+ }
+
+ public List<HiveTableColumnMeta> getAllColumns() {
+ return allColumns;
+ }
+
+ public void setAllColumns(List<HiveTableColumnMeta> allColumns) {
+ this.allColumns = allColumns;
+ }
+
+ public List<HiveTableColumnMeta> getPartitionColumns() {
+ return partitionColumns;
+ }
+
+ public void setPartitionColumns(List<HiveTableColumnMeta> partitionColumns) {
+ this.partitionColumns = partitionColumns;
+ }
+
@Override
public String toString() {
return "HiveTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\'' + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\'' + ", owner='" + owner + '\'' + ", tableType='" + tableType + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns + ", partitionColumns=" + partitionColumns + '}';