You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 06:14:01 UTC
[1/2] kylin git commit: KYLIN-1826, add external hive interface,
project,
table.. Signed-off-by: terry-chelsea [Forced
Update!]
Repository: kylin
Updated Branches:
refs/heads/KYLIN-1826 726fb8fd6 -> 417fed32f (forced update)
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
new file mode 100644
index 0000000..ef7b54d
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.HiveMRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * External hive file mr input
+ * @author hzfengyu
+ */
+public class ExternalHiveMRInput extends HiveMRInput {
+ @Override
+ public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ return new BatchFileCubingInputSide(flatDesc);
+ }
+
+ public static class BatchFileCubingInputSide extends BatchCubingInputSide {
+ private static final Logger logger = LoggerFactory.getLogger(BatchFileCubingInputSide.class);
+
+ public BatchFileCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+ super(flatDesc);
+ }
+
+ @Override
+ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+ super.addStepPhase1_CreateFlatTable(jobFlow);
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+
+ /**
+ * Create table as flat hive table in default hive the same as external hive table, use it in next steps.
+ * Do not create view in default table. because kylin read lookup table file.
+ */
+ jobFlow.addTask(createFlatTableInDefaultHive(conf, flatDesc, jobFlow.getId(), cubeName));
+ AbstractExecutable copyDataStep = createCopyHiveDataStep(flatDesc.getTableName(), flatDesc.getHiveName(),
+ JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+ if(copyDataStep != null) {
+ jobFlow.addTask(copyDataStep);
+ }
+ }
+
+ @Override
+ protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) {
+ String tempDir = System.getProperty("java.io.tmpdir", "/tmp");
+ return String.format("file://%s/kylin-%s/%s", tempDir, jobId, "/row_count");
+ }
+
+ @Override
+ protected boolean isWriteToLocalDir() {
+ return true;
+ }
+
+ protected AbstractExecutable createCopyHiveDataStep(String flatHiveTableName, String hiveName, String output) {
+ DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable();
+ copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_HIVE_DATA);
+ copyHiveTableSetp.setHiveName(hiveName);
+ copyHiveTableSetp.setOutputPath(output);
+ copyHiveTableSetp.setTableName(flatHiveTableName);
+
+ return copyHiveTableSetp;
+ }
+
+ protected AbstractExecutable createFlatTableInDefaultHive(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+ StringBuilder hiveInitBuf = new StringBuilder();
+ hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
+
+ final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
+ final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+ final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+
+ CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ step.setHiveName(null);
+ step.setUseRedistribute(false);
+ step.setInitStatement(hiveInitBuf.toString());
+ step.setRowCountOutputDir(null);
+ step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql);
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT);
+ return step;
+ }
+
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ String hiveName = flatDesc.getHiveName();
+
+ ExternalGarbageCollectionStep step = new ExternalGarbageCollectionStep();
+ step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+ step.setIntermediateTableIdentity(getIntermediateTableIdentity());
+ step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+ step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
+ step.setHiveName(hiveName);
+ jobFlow.addTask(step);
+ }
+
+ public static class ExternalGarbageCollectionStep extends GarbageCollectionStep {
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig config = context.getConfig();
+ StringBuffer output = new StringBuffer();
+ try {
+ output.append(super.cleanUpIntermediateFlatTable(config));
+ output.append(cleanUpExternalHiveFlatTable(config));
+ // don't drop view to avoid concurrent issue
+ //output.append(cleanUpHiveViewIntermediateTable(config));
+ } catch (IOException e) {
+ logger.error("job:" + getId() + " execute finished with exception", e);
+ return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+ }
+
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+ }
+
+ public String cleanUpExternalHiveFlatTable(KylinConfig config) throws IOException {
+ StringBuffer output = new StringBuffer();
+ String hiveName = this.getHiveName();
+ if(hiveName == null)
+ return output.toString();
+ final String hiveTable = getIntermediateTableIdentity();
+ if (StringUtils.isNotEmpty(hiveTable)) {
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
+ hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+ hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS " + hiveTable + ";");
+
+ //remove table location first, otherwise table will not be found.
+ String rmExternalOutput = rmExternalTableDirOnHDFS(hiveTable, hiveName);
+ config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+ output.append("Hive table " + hiveTable + " is dropped. \n");
+ output.append(rmExternalOutput);
+ }
+ return output.toString();
+ }
+
+ private String rmExternalTableDirOnHDFS(String tableName, String hive) throws IOException {
+ try {
+ String dir = HiveManager.getInstance().getHiveTableLocation(tableName, hive);
+ Path path = new Path(dir);
+ FileSystem fs = HadoopUtil.getFileSystem(dir);
+ if(fs.exists(path)) {
+ fs.delete(path, true);
+ }
+ return "Remove External Hive " + hive + " Table " + tableName + ", location " + path.toString() + "\n";
+ } catch (Exception e) {
+ logger.warn("Get table localtion failed ! table {}, hive name {}.", tableName, hive , e);
+ return "Fetch external table location or delete path failed. skip it.";
+ }
+ }
+
+ public void setHiveName(String hiveName) {
+ setParam("hiveName", hiveName);
+ }
+
+ public String getHiveName() {
+ return getParam("hiveName");
+ }
+
+ @Override
+ public String getCmd() {
+ StringBuffer sb = new StringBuffer(super.getCmd());
+ sb.append(" -").append("hiveName").append(" ").append(this.getHiveName());
+ return sb.toString();
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
new file mode 100644
index 0000000..b1e8101
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+ import java.util.List;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.hive.HiveTable;
+
+import com.google.common.collect.Lists;
+
+ public class ExternalHiveSource implements ISource {
+
+ @Override
+ public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+ if (engineInterface == IMRInput.class) {
+ return (I) new ExternalHiveMRInput();
+ } else {
+ throw new RuntimeException("Cannot adapt to " + engineInterface);
+ }
+ }
+
+ @Override
+ public ReadableTable createReadableTable(TableDesc tableDesc) {
+ return new HiveTable(tableDesc);
+ }
+
+ @Override
+ public List<String> getMRDependentResources(TableDesc table) {
+ return Lists.newArrayList();
+ }
+
+ }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
new file mode 100644
index 0000000..cc2ad8b
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.source.hive.HiveClient;
+import org.apache.log4j.Logger;
+
+
+/**
+ * Manager all hive client, default client and external clients.
+ * This is a Singleton object.
+ * @author hzfengyu
+ */
+public class HiveManager {
+ private volatile static HiveManager HIVE_MANAGER_CACHE = null;
+ private static final Logger logger = Logger.getLogger(HiveManager.class);
+ private static final String DEFAULT_HIVE_NAME = "default(null)";
+ private static final String HIVE_CONFIG_FILE_LOCATION = "conf/hive-site.xml";
+ private static final String HIVE_COMMAND_LOCATION = "bin/hive";
+ public static final String NOT_SUPPORT_ERROR_MESSAGE =
+ "Do not support external hive, set kylin.external.hive.root.directory to support it";
+
+ //hive root directory, if this equals to null or empty meaning just use default hive
+ private File externalHiveRootDir = null;
+ private ConcurrentHashMap<String, HiveClient> externalHiveMap = null;
+ private HiveClient defaultHiveClient = null;
+
+ public static HiveManager getInstance() {
+ //using default kylin config
+ if(HIVE_MANAGER_CACHE == null) {
+ KylinConfig config = KylinConfig.getInstanceFromEnv();
+ synchronized(HiveManager.class) {
+ if(HIVE_MANAGER_CACHE == null) {
+ HIVE_MANAGER_CACHE = new HiveManager(config);
+ }
+ }
+ }
+ return HIVE_MANAGER_CACHE;
+ }
+
+ private HiveManager(KylinConfig config) {
+ String externalRootDir = config.getExternalHiveRootDirectory();
+ if(externalRootDir != null && !externalRootDir.isEmpty()) {
+ File file = new File(externalRootDir);
+ //check to ensure hive root file exist
+ this.externalHiveRootDir = file;
+ if(!(file.exists() && file.isDirectory())) {
+ logger.warn("Hive root directory " + file.getAbsolutePath() + " do not exist !");
+ this.externalHiveRootDir = null;
+ }
+ } else {
+ this.externalHiveRootDir = null;
+ }
+ this.externalHiveMap = new ConcurrentHashMap<String, HiveClient>();
+ }
+
+ public List<String> getExternalHiveName() {
+ List<String> hiveNames = new LinkedList<String>();
+ hiveNames.add(DEFAULT_HIVE_NAME);
+ if(this.externalHiveRootDir == null)
+ return hiveNames;
+
+ //take every diectory in hive root dir is a hive source. take directory name as hive name
+ for(File file : this.externalHiveRootDir.listFiles()) {
+ if(!file.isDirectory()) {
+ logger.warn("File " + file.getAbsolutePath() + " in hive root directory is normal file.");
+ continue;
+ }
+ hiveNames.add(file.getName());
+ }
+ return hiveNames;
+ }
+
+ private void checkIsSupportExternalHive() {
+ if(this.externalHiveRootDir == null) {
+ throw new IllegalArgumentException(NOT_SUPPORT_ERROR_MESSAGE);
+ }
+ }
+
+ public String getHiveConfigFile(String hiveName) {
+ if(hiveName == null) {
+ return null;
+ }
+ checkIsSupportExternalHive();
+
+ File hiveRootFile = new File(this.externalHiveRootDir, hiveName);
+ if(!(hiveRootFile.exists() && hiveRootFile.isDirectory())) {
+ throw new IllegalArgumentException("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist.");
+ }
+
+ File hiveConfigFile = new File(hiveRootFile, HIVE_CONFIG_FILE_LOCATION);
+ if(!(hiveConfigFile.exists() && hiveConfigFile.isFile())) {
+ throw new IllegalArgumentException("Hive " + hiveName + " config file " + hiveConfigFile.getAbsolutePath() + " do not exist.");
+ }
+ return hiveConfigFile.getAbsolutePath();
+ }
+
+ public HiveClient createHiveClient(String hiveName) {
+ //use internal hive client while do not appoint a hive
+ if(hiveName == null) {
+ if(defaultHiveClient == null)
+ defaultHiveClient = new ExternalHiveClient(null);
+ return this.defaultHiveClient;
+ }
+ checkIsSupportExternalHive();
+
+ HiveClient client = this.externalHiveMap.get(hiveName);
+ if(client != null)
+ return client;
+ String configFileLocation = getHiveConfigFile(hiveName);
+ if(configFileLocation == null) {
+ throw new IllegalArgumentException("Can not find hive " + hiveName + " config file in external hive root directory " +
+ this.externalHiveRootDir.getAbsolutePath());
+ }
+
+ try {
+ client = new ExternalHiveClient(configFileLocation);
+ this.externalHiveMap.put(hiveName, client);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Can not create hive client for " + hiveName + ", config file " + configFileLocation);
+ }
+ return client;
+ }
+
+ public HiveClient createHiveClientWithConfig(Map<String, String> configMap, String hiveName) {
+ HiveClient client = this.createHiveClient(hiveName);
+ client.appendConfiguration(configMap);
+ return client;
+ }
+
+ public String getHiveCommand(String hiveName) {
+ if(hiveName == null) {
+ return "hive";
+ }
+ checkIsSupportExternalHive();
+
+ File hiveRootFile = new File(this.externalHiveRootDir, hiveName);
+ if(!(hiveRootFile.exists() && hiveRootFile.isDirectory())) {
+ throw new IllegalArgumentException("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist.");
+ }
+
+ File hiveCmdFile = new File(hiveRootFile, HIVE_COMMAND_LOCATION);
+ if(!(hiveCmdFile.exists() && hiveCmdFile.isFile())) {
+ throw new IllegalArgumentException("Hive " + hiveName + " bin file " + hiveCmdFile.getAbsolutePath() + " do not exist.");
+ }
+ return hiveCmdFile.getAbsolutePath();
+ }
+
+ public String getHiveTableLocation(String database, String tableName, String hiveName) throws Exception {
+ HiveClient hiveClient = this.createHiveClient(hiveName);
+ try {
+ String tableLocation = hiveClient.getHiveTableLocation(database, tableName);
+ return tableLocation;
+ } catch (Exception e) {
+ logger.error("Get hive " + hiveName + " table " + tableName + " location error !");
+ throw e;
+ }
+ }
+
+ public String getHiveTableLocation(String fullTableName, String hiveName) throws Exception {
+ String[] tables = HadoopUtil.parseHiveTableName(fullTableName);
+ String database = tables[0];
+ String tableName = tables[1];
+ return getHiveTableLocation(database, tableName, hiveName);
+ }
+
+ public static void clearCache() {
+ HIVE_MANAGER_CACHE = null;
+ getInstance();
+ }
+
+ public boolean isSupportExternalHives() {
+ return this.externalHiveRootDir != null;
+ }
+
+ public static boolean isSameHiveSource(String first, String second) {
+ if(first == null) {
+ return second == null;
+ } else {
+ return first.equalsIgnoreCase(second);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/controllers/page.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/page.js b/webapp/app/js/controllers/page.js
index c65a264..c07cf70 100644
--- a/webapp/app/js/controllers/page.js
+++ b/webapp/app/js/controllers/page.js
@@ -215,6 +215,7 @@ var projCtrl = function ($scope, $location, $modalInstance, ProjectService, Mess
var requestBody = {
formerProjectName: $scope.state.oldProjName,
newProjectName: $scope.proj.name,
+ newHiveName: $scope.proj.hive,
newDescription: $scope.proj.description
};
ProjectService.update({}, requestBody, function (newProj) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index e3ab0ac..1c08aa0 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -177,7 +177,7 @@ KylinApp
$scope.loadHive = function () {
if($scope.hiveLoaded)
return;
- TableService.showHiveDatabases({}, function (databases) {
+ TableService.showHiveDatabases({project:$scope.projectName}, function (databases) {
$scope.dbNum = databases.length;
if (databases.length > 0) {
$scope.hiveMap = {};
@@ -293,7 +293,7 @@ KylinApp
$scope.showToggle = function(node) {
if(node.expanded == false){
- TableService.showHiveTables({"database": node.label},function (hive_tables){
+ TableService.showHiveTables({"database": node.label, project:$scope.projectName},function (hive_tables){
var tables = [];
for (var i = 0; i < hive_tables.length; i++) {
tables.push(hive_tables[i]);
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/model/projectConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/projectConfig.js b/webapp/app/js/model/projectConfig.js
index 270eb7a..2a84b47 100644
--- a/webapp/app/js/model/projectConfig.js
+++ b/webapp/app/js/model/projectConfig.js
@@ -20,6 +20,7 @@ KylinApp.constant('projectConfig', {
theaditems: [
{attr: 'name', name: 'Name'},
{attr: 'owner', name: 'Owner'},
+ {attr: 'hive', name: 'Hive Name'},
{attr: 'description', name: 'Description'},
{attr: 'create_time', name: 'Create Time'}
]
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/partials/projects/project_create.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/projects/project_create.html b/webapp/app/partials/projects/project_create.html
index 36cbaf5..cd150b1 100644
--- a/webapp/app/partials/projects/project_create.html
+++ b/webapp/app/partials/projects/project_create.html
@@ -36,6 +36,15 @@
</div>
</div>
<div class="form-group">
+ <label><b>Hive Name</b></label>
+
+ <div class="clearfix">
+ <input name="description_input" type="text" class="form-control"
+ placeholder="External hive name that the project take as input source, ignore it if you are using default hive."
+ ng-model="proj.hive"></textarea>
+ </div>
+ </div>
+ <div class="form-group">
<label><b>Project Description</b></label>
<div class="clearfix">
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/partials/projects/projects.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/projects/projects.html b/webapp/app/partials/projects/projects.html
index 96e4a91..10b4dba 100644
--- a/webapp/app/partials/projects/projects.html
+++ b/webapp/app/partials/projects/projects.html
@@ -54,6 +54,7 @@
{{ project.name}}
</td>
<td>{{ project.owner}}</td>
+ <td>{{ project.hive}}</td>
<td>{{ project.description}}</td>
<td>{{ project.create_time_utc | utcToConfigTimeZone}}</td>
<td>
[2/2] kylin git commit: KYLIN-1826, add external hive interface,
project, table.. Signed-off-by: terry-chelsea
Posted by sh...@apache.org.
KYLIN-1826, add external hive interface, project, table..
Signed-off-by: terry-chelsea <hz...@corp.netease.com>
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/417fed32
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/417fed32
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/417fed32
Branch: refs/heads/KYLIN-1826
Commit: 417fed32f7abbea459906f5be148c974659effd4
Parents: b08871e
Author: terry-chelsea <hz...@corp.netease.com>
Authored: Fri Oct 28 20:38:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 14:11:46 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 5 +
.../cube/model/CubeJoinedFlatTableDesc.java | 5 +
.../cube/model/CubeJoinedFlatTableEnrich.java | 4 +
.../org/apache/kylin/job/JoinedFlatTable.java | 10 +-
.../kylin/job/constant/ExecutableConstants.java | 2 +
.../metadata/model/IJoinedFlatTableDesc.java | 2 +
.../kylin/metadata/model/ISourceAware.java | 1 +
.../apache/kylin/metadata/model/TableDesc.java | 15 +-
.../kylin/metadata/project/ProjectInstance.java | 11 +
.../kylin/metadata/project/ProjectManager.java | 22 +-
.../hive/ITHiveSourceTableLoaderTest.java | 3 +-
.../source/hive/ITHiveTableReaderTest.java | 2 +-
.../kylin/rest/controller/CubeController.java | 17 ++
.../rest/controller/ProjectController.java | 35 +++-
.../rest/controller/StreamingController.java | 4 +
.../kylin/rest/controller/TableController.java | 24 ++-
.../rest/request/CreateProjectRequest.java | 12 ++
.../rest/request/UpdateProjectRequest.java | 13 ++
.../kylin/rest/response/TableDescResponse.java | 1 +
.../apache/kylin/rest/service/BasicService.java | 14 +-
.../apache/kylin/rest/service/CacheService.java | 4 +-
.../apache/kylin/rest/service/CubeService.java | 10 +-
.../kylin/rest/service/ProjectService.java | 6 +-
.../source/hive/CreateFlatHiveTableStep.java | 11 +-
.../apache/kylin/source/hive/HiveClient.java | 2 +-
.../kylin/source/hive/HiveCmdBuilder.java | 13 +-
.../apache/kylin/source/hive/HiveMRInput.java | 87 +++++---
.../source/hive/HiveSourceTableLoader.java | 22 +-
.../org/apache/kylin/source/hive/HiveTable.java | 7 +-
.../kylin/source/hive/HiveTableReader.java | 16 +-
.../apache/kylin/source/hive/HqlExecutable.java | 12 +-
.../hive/external/DistcpShellExecutable.java | 100 +++++++++
.../hive/external/ExternalHiveClient.java | 80 +++++++
.../hive/external/ExternalHiveMRInput.java | 200 ++++++++++++++++++
.../hive/external/ExternalHiveSource.java | 51 +++++
.../kylin/source/hive/external/HiveManager.java | 206 +++++++++++++++++++
webapp/app/js/controllers/page.js | 1 +
webapp/app/js/controllers/sourceMeta.js | 4 +-
webapp/app/js/model/projectConfig.js | 1 +
.../app/partials/projects/project_create.html | 9 +
webapp/app/partials/projects/projects.html | 1 +
41 files changed, 975 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 79ee084..39c4a3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -365,6 +365,10 @@ abstract public class KylinConfigBase implements Serializable {
public String getCliWorkingDir() {
return getOptional("kylin.job.remote.cli.working.dir");
}
+
+ public String getExternalHiveRootDirectory() {
+ return getOptional("kylin.external.hive.root.directory", null);
+ }
public boolean isEmptySegmentAllowed() {
return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
@@ -717,6 +721,7 @@ abstract public class KylinConfigBase implements Serializable {
Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine."));
// ref constants in ISourceAware
r.put(0, "org.apache.kylin.source.hive.HiveSource");
+ r.put(6, "org.apache.kylin.source.hive.external.ExternalHiveSource");
return r;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6aeb617..1e62ac9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -161,5 +161,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
public TblColRef getDistributedBy() {
return cubeDesc.getDistributedByColumn();
}
+
+ @Override
+ public String getHiveName() {
+ return this.getDataModel().getFactTableDesc().getHive();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 5212859..566490e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -137,4 +137,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
return flatDesc.getDistributedBy();
}
+ @Override
+ public String getHiveName() {
+ return this.getDataModel().getFactTableDesc().getHive();
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 699d084..7f73393 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -141,12 +141,13 @@ public class JoinedFlatTable {
return sql.toString();
}
- public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
+ public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir, boolean local) {
final Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
final StringBuilder sql = new StringBuilder();
+ String localStr = local ? "LOCAL" : "";
final String factTbl = flatDesc.getDataModel().getFactTable();
sql.append("dfs -mkdir -p " + outputDir + ";\n");
- sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
+ sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
appendWhereStatement(flatDesc, sql, tableAliasMap);
return sql.toString();
}
@@ -285,10 +286,11 @@ public class JoinedFlatTable {
return hiveDataType.toLowerCase();
}
- public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) {
+ public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir, boolean local) {
StringBuilder sql = new StringBuilder();
+ String localStr = local ? "LOCAL" : "";
sql.append("set hive.exec.compress.output=false;\n");
- sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
+ sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
return sql.toString();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 893c034..5cac792 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,8 @@ public final class ExecutableConstants {
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+ public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT = "Create Intermediate Flat Hive Table in Default Hive";
+ public static final String STEP_NAME_COPY_HIVE_DATA = "Copy Intermediate Table To Local Hadoop";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_COUNT_HIVE_TABLE = "Count Source Table";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index f3a4107..71c015a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -37,4 +37,6 @@ public interface IJoinedFlatTableDesc {
long getSourceOffsetEnd();
TblColRef getDistributedBy();
+ // Determine hive name by data model fact table.
+ String getHiveName();
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 0f98d5d..b4030dd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -23,6 +23,7 @@ public interface ISourceAware {
public static final int ID_HIVE = 0;
public static final int ID_STREAMING = 1;
public static final int ID_SPARKSQL = 5;
+ public static final int ID_EXTERNAL_HIVE = 6;
public static final int ID_EXTERNAL = 7;
int getSourceType();
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index e163d1d..f647d59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -45,6 +45,8 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
private int sourceType = ISourceAware.ID_HIVE;
@JsonProperty("table_type")
private String tableType;
+ @JsonProperty("hive")
+ private String hive;
private static final String materializedTableNamePrefix = "kylin_intermediate_";
@@ -60,6 +62,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
this.columns = other.getColumns();
this.database.setName(other.getDatabase());
this.tableType = other.getTableType();
+ this.hive = other.getHive();
}
public ColumnDesc findColumnByName(String name) {
@@ -235,7 +238,9 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
@Override
public int getSourceType() {
- return sourceType;
+ if(!(ISourceAware.ID_HIVE == sourceType || ISourceAware.ID_EXTERNAL_HIVE == sourceType))
+ return this.sourceType;
+ return hive == null ? ISourceAware.ID_HIVE : ISourceAware.ID_EXTERNAL_HIVE;
}
public void setSourceType(int sourceType) {
@@ -250,4 +255,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
this.tableType = tableType;
}
+ public String getHive() {
+ return hive;
+ }
+
+ public void setHive(String hive) {
+ this.hive = hive;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 1afc603..b0ce889 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -49,6 +49,9 @@ public class ProjectInstance extends RootPersistentEntity {
@JsonProperty("name")
private String name;
+
+ @JsonProperty("hive")
+ private String hive;
@JsonProperty("tables")
private Set<String> tables = new TreeSet<String>();
@@ -153,6 +156,14 @@ public class ProjectInstance extends RootPersistentEntity {
public void setName(String name) {
this.name = name;
}
+
+ public String getHive() {
+ return hive;
+ }
+
+ public void setHive(String hive) {
+ this.hive = hive;
+ }
public boolean containsRealization(final RealizationType type, final String realization) {
return Iterables.any(this.realizationEntries, new Predicate<RealizationEntry>() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 1bf9804..e86c1a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -152,6 +152,15 @@ public class ProjectManager {
return currentProject;
}
+
+ public ProjectInstance createProject(String projectName, String owner, String description, String hiveName) throws IOException {
+ logger.info("Creating project " + projectName + ", hive name " + hiveName);
+ ProjectInstance currentProject = this.createProject(projectName, owner, description);
+ currentProject.setHive(hiveName);
+ updateProject(currentProject);
+
+ return currentProject;
+ }
public ProjectInstance dropProject(String projectName) throws IOException {
if (projectName == null)
@@ -183,9 +192,9 @@ public class ProjectManager {
}
//update project itself
- public ProjectInstance updateProject(ProjectInstance project, String newName, String newDesc) throws IOException {
+ public ProjectInstance updateProject(ProjectInstance project, String newName, String newDesc, String hiveName) throws IOException {
if (!project.getName().equals(newName)) {
- ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc);
+ ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc, hiveName);
newProject.setCreateTimeUTC(project.getCreateTimeUTC());
newProject.recordUpdateTime(System.currentTimeMillis());
@@ -200,8 +209,13 @@ public class ProjectManager {
return newProject;
} else {
project.setName(newName);
- project.setDescription(newDesc);
-
+ if(newDesc != null) {
+ project.setDescription(newDesc);
+ }
+ if(hiveName != null) {
+ project.setHive(hiveName);
+ }
+
if (project.getUuid() == null)
project.updateRandomUuid();
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index c4f0777..c33840c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -45,7 +45,8 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
public void test() throws IOException {
KylinConfig config = getTestConfig();
String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+ String project = "learn_kylin";
+ Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, project, config);
assertTrue(loaded.size() == toLoad.length);
for (String str : toLoad)
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
index 757888e..53906dc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -34,7 +34,7 @@ public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
@Test
public void test() throws IOException {
- HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+ HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact", null);
int rowNumber = 0;
while (reader.next()) {
String[] row = reader.getRow();
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 5397df7..d371230 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -56,6 +56,7 @@ import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.JobService;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
import org.slf4j.Logger;
@@ -343,6 +344,9 @@ public class CubeController extends BasicController {
String newCubeName = cubeRequest.getCubeName();
String project = cubeRequest.getProject();
+ if(!this.isBasedSameSource(cubeName, project)) {
+ throw new InternalErrorException("Can not move cube to project with different hive source!");
+ }
CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
if (cube == null) {
throw new InternalErrorException("Cannot find cube " + cubeName);
@@ -364,6 +368,19 @@ public class CubeController extends BasicController {
return newCube;
}
+
+ private boolean isBasedSameSource(String cubeName, String projectName) {
+ CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+ if(cube == null)
+ return true;
+
+ ProjectInstance project = cubeService.getProjectManager().getProject(projectName);
+ if(project == null) {
+ return true;
+ }
+ ProjectInstance cubeProject = cubeService.getProjectByCube(cubeName);
+ return HiveManager.isSameHiveSource(project.getHive(), cubeProject.getHive());
+ }
@RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT })
@ResponseBody
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
index 496e44a..f85638a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
@@ -33,6 +33,7 @@ import org.apache.kylin.rest.request.UpdateProjectRequest;
import org.apache.kylin.rest.service.AccessService;
import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -199,7 +200,9 @@ public class ProjectController extends BasicController {
if (StringUtils.isEmpty(projectRequest.getName())) {
throw new InternalErrorException("A project name must be given to create a project");
}
-
+
+ String hiveName = projectRequest.getHive();
+ checkHiveIsValid(hiveName, null);
ProjectInstance createdProj = null;
try {
createdProj = projectService.createProject(projectRequest);
@@ -217,7 +220,7 @@ public class ProjectController extends BasicController {
if (StringUtils.isEmpty(projectRequest.getFormerProjectName())) {
throw new InternalErrorException("A project name must be given to update a project");
}
-
+ checkHiveIsValid(projectRequest.getNewHiveName(), projectRequest.getFormerProjectName());
ProjectInstance updatedProj = null;
try {
ProjectInstance currentProject = projectService.getProjectManager().getProject(projectRequest.getFormerProjectName());
@@ -254,4 +257,32 @@ public class ProjectController extends BasicController {
public void setCubeService(CubeService cubeService) {
this.cubeService = cubeService;
}
+
+ /**
+ * Check input hive name is valid
+ * @param hiveName
+ */
+ private void checkHiveIsValid(String hiveName, String projectName) {
+ try {
+ if(HiveManager.getInstance().isSupportExternalHives()) {
+ HiveManager.getInstance().getHiveCommand(hiveName);
+ HiveManager.getInstance().getHiveConfigFile(hiveName);
+ } else if(hiveName != null) {
+ throw new InternalErrorException(HiveManager.NOT_SUPPORT_ERROR_MESSAGE);
+ }
+ } catch (IllegalArgumentException e) {
+ logger.error("Can not find hive " + hiveName + ", support hives : " +
+ HiveManager.getInstance().getExternalHiveName(), e);
+ throw new InternalErrorException("Can not find hive " + hiveName + " by current external hives configuration");
+ }
+
+ if(projectName == null)
+ return ;
+ ProjectInstance project = projectService.getProjectManager().getProject(projectName);
+ if(project != null && !project.getRealizationEntries().isEmpty() && project.getHive() != hiveName) {
+ logger.warn("Update project {} from hive {} to {} may cause error, you can create a new project instead.",
+ projectName, project.getHive(), hiveName);
+ throw new InternalErrorException("It is not allow to modify hive name when cube exists in project.");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..addf544 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -29,6 +29,7 @@ import org.apache.kylin.engine.mr.HadoopUtil;
import org.apache.kylin.engine.streaming.StreamingConfig;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
@@ -103,6 +104,8 @@ public class StreamingController extends BasicController {
public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
String project = streamingRequest.getProject();
+ ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+ String hive = projectMgr.getProject(project).getHive();
TableDesc tableDesc = deserializeTableDesc(streamingRequest);
StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
@@ -110,6 +113,7 @@ public class StreamingController extends BasicController {
try {
tableDesc.setUuid(UUID.randomUUID().toString());
+ tableDesc.setHive(hive);
MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
metaMgr.saveSourceTable(tableDesc);
cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project);
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index eefeba8..81ccb4b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.CardinalityRequest;
import org.apache.kylin.rest.request.HiveTableRequest;
@@ -48,6 +49,7 @@ import org.apache.kylin.rest.service.ModelService;
import org.apache.kylin.rest.service.ProjectService;
import org.apache.kylin.rest.service.StreamingService;
import org.apache.kylin.source.hive.HiveClient;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -145,7 +147,7 @@ public class TableController extends BasicController {
@ResponseBody
public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
- String[] loaded = cubeMgmtService.reloadHiveTable(tables);
+ String[] loaded = cubeMgmtService.reloadHiveTable(tables, project);
if (request.isCalculate()) {
cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter);
}
@@ -233,8 +235,11 @@ public class TableController extends BasicController {
public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
Map<String, String> result = new HashMap<String, String>();
String project = request.getProject();
+ String hiveName = getHiveNameByProject(project);
+
TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class);
desc.setUuid(UUID.randomUUID().toString());
+ desc.setHive(hiveName);
MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
metaMgr.saveSourceTable(desc);
cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project);
@@ -311,11 +316,13 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive", method = { RequestMethod.GET })
@ResponseBody
- private static List<String> showHiveDatabases() throws IOException {
- HiveClient hiveClient = new HiveClient();
+ private static List<String> showHiveDatabases(@RequestParam(value = "project", required = true) String project)
+ throws IOException {
List<String> results = null;
try {
+ String hiveName = getHiveNameByProject(project);
+ HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
results = hiveClient.getHiveDbNames();
} catch (Exception e) {
e.printStackTrace();
@@ -332,11 +339,13 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET })
@ResponseBody
- private static List<String> showHiveTables(@PathVariable String database) throws IOException {
- HiveClient hiveClient = new HiveClient();
+ private static List<String> showHiveTables(@RequestParam(value = "project", required = true) String project,
+ @PathVariable String database) throws IOException {
List<String> results = null;
try {
+ String hiveName = getHiveNameByProject(project);
+ HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
results = hiveClient.getHiveTableNames(database);
} catch (Exception e) {
e.printStackTrace();
@@ -344,6 +353,11 @@ public class TableController extends BasicController {
}
return results;
}
+
+ private static String getHiveNameByProject(String project) {
+ ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+ return projectMgr.getProject(project).getHive();
+ }
public void setCubeService(CubeService cubeService) {
this.cubeMgmtService = cubeService;
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
index 71cd1c4..51f100e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
@@ -22,6 +22,7 @@ package org.apache.kylin.rest.request;
*/
public class CreateProjectRequest {
private String name;
+ private String hive;
private String description;
public CreateProjectRequest() {
@@ -43,4 +44,15 @@ public class CreateProjectRequest {
this.description = description;
}
+ public String getHive() {
+ if(this.hive == null || this.hive.trim().isEmpty()) {
+ return null;
+ }
+ return this.hive.trim();
+ }
+
+ public void setHive(String hive) {
+ this.hive = hive;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
index 29ba162..6bc077b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
@@ -23,6 +23,7 @@ package org.apache.kylin.rest.request;
public class UpdateProjectRequest {
private String formerProjectName;
private String newProjectName;
+ private String newHiveName;
private String newDescription;
public UpdateProjectRequest() {
@@ -52,4 +53,16 @@ public class UpdateProjectRequest {
public void setNewProjectName(String newProjectName) {
this.newProjectName = newProjectName;
}
+
+ public String getNewHiveName() {
+ if(this.newHiveName == null || this.newHiveName.trim().isEmpty()) {
+ return null;
+ }
+ return this.newHiveName.trim();
+ }
+
+ public void setNewHiveName(String newHiveName) {
+ this.newHiveName = newHiveName;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
index c3b1e7c..f041bf5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
@@ -76,6 +76,7 @@ public class TableDescResponse extends TableDesc {
this.setName(table.getName());
this.setSourceType(table.getSourceType());
this.setUuid(table.getUuid());
+ this.setHive(table.getHive());
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..e96bddf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -26,6 +26,7 @@ import java.util.Set;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -143,7 +144,18 @@ public abstract class BasicService {
})));
return results;
}
-
+
+ public ProjectInstance getProjectByCube(String cubeName) {
+ CubeInstance cube = getCubeManager().getCube(cubeName);
+ List<ProjectInstance> projList = this.getProjectManager().findProjects(cube.getType(), cube.getName());
+ if (projList == null || projList.size() == 0) {
+ throw new RuntimeException("Cannot find the project containing the cube " + cube.getName());
+ } else if (projList.size() >= 2) {
+ throw new RuntimeException("Find more than one project containing the cube " + cube.getName() + ". It does't meet the uniqueness requirement");
+ }
+ return projList.get(0);
+ }
+
protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList) {
return listAllCubingJobs(cubeName, projectName, statusList, getExecutableManager().getAllOutputs());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 2160e3d..6e091d8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -48,6 +48,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
import org.apache.kylin.query.enumerator.OLAPQuery;
import org.apache.kylin.query.schema.OLAPSchemaFactory;
import org.apache.kylin.rest.controller.QueryController;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.apache.kylin.source.kafka.KafkaConfigManager;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hybrid.HybridManager;
@@ -210,7 +211,8 @@ public class CacheService extends BasicService {
KafkaConfigManager.clearCache();
StreamingManager.clearCache();
HBaseConnection.clearConnCache();
-
+ HiveManager.clearCache();
+
cleanAllDataCache();
removeAllOLAPDataSources();
break;
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4cd527c..dcad40f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -475,6 +475,10 @@ public class CubeService extends BasicService {
logger.error("Cannot find table descirptor " + tableName, e);
throw e;
}
+ if(table.getHive() != null) {
+ logger.warn("Can not calculate cardinality for table {} which is loaded from external hive source !", tableName);
+ return;
+ }
DefaultChainedExecutable job = new DefaultChainedExecutable();
job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
@@ -482,7 +486,7 @@ public class CubeService extends BasicService {
String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName;
String param = "-table " + tableName + " -output " + outPath;
-
+
MapReduceExecutable step1 = new MapReduceExecutable();
step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
@@ -561,8 +565,8 @@ public class CubeService extends BasicService {
}
@PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
- public String[] reloadHiveTable(String tables) throws IOException {
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig());
+ public String[] reloadHiveTable(String tables, String project) throws IOException {
+ Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), project, getConfig());
return (String[]) loaded.toArray(new String[loaded.size()]);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index b4cceb2..00e4da6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -53,13 +53,14 @@ public class ProjectService extends BasicService {
public ProjectInstance createProject(CreateProjectRequest projectRequest) throws IOException {
String projectName = projectRequest.getName();
String description = projectRequest.getDescription();
+ String hiveName = projectRequest.getHive();
ProjectInstance currentProject = getProjectManager().getProject(projectName);
if (currentProject != null) {
throw new InternalErrorException("The project named " + projectName + " already exists");
}
String owner = SecurityContextHolder.getContext().getAuthentication().getName();
- ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description);
+ ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description, hiveName);
accessService.init(createdProject, AclPermission.ADMINISTRATION);
logger.debug("New project created.");
@@ -71,12 +72,13 @@ public class ProjectService extends BasicService {
String formerProjectName = projectRequest.getFormerProjectName();
String newProjectName = projectRequest.getNewProjectName();
String newDescription = projectRequest.getNewDescription();
+ String newHiveName = projectRequest.getNewHiveName();
if (currentProject == null) {
throw new InternalErrorException("The project named " + formerProjectName + " does not exists");
}
- ProjectInstance updatedProject = getProjectManager().updateProject(currentProject, newProjectName, newDescription);
+ ProjectInstance updatedProject = getProjectManager().updateProject(currentProject, newProjectName, newDescription, newHiveName);
logger.debug("Project updated.");
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index bcb9a38..39cfcad 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -74,7 +74,8 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
}
private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final String hiveName = getHiveName();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
hiveCmdBuilder.addStatement(getInitStatement());
boolean useRedistribute = getUseRedistribute();
if (useRedistribute == true) {
@@ -158,4 +159,12 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
public String getRowCountOutputDir() {
return getParam("rowCountOutputDir");
}
+
+ public void setHiveName(String hiveName) {
+ setParam("hiveName", hiveName);
+ }
+
+ public String getHiveName() {
+ return getParam("hiveName");
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
index a99b304..891c635 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
@@ -104,7 +104,7 @@ public class HiveClient {
executeHQL(sql);
}
- private HiveMetaStoreClient getMetaStoreClient() throws Exception {
+ protected HiveMetaStoreClient getMetaStoreClient() throws Exception {
if (metaStoreClient == null) {
metaStoreClient = new HiveMetaStoreClient(hiveConf);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
index 5a5b4e0..bde6e3b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,10 +43,16 @@ public class HiveCmdBuilder {
private HiveClientMode clientMode;
private KylinConfig kylinConfig;
final private ArrayList<String> statements = Lists.newArrayList();
+ protected String hiveName;
public HiveCmdBuilder() {
+ this(null);
+ }
+
+ public HiveCmdBuilder(String hive) {
kylinConfig = KylinConfig.getInstanceFromEnv();
clientMode = HiveClientMode.valueOf(kylinConfig.getHiveClientMode().toUpperCase());
+ this.hiveName = hive;
}
public String build() {
@@ -53,13 +60,17 @@ public class HiveCmdBuilder {
switch (clientMode) {
case CLI:
- buf.append("hive -e \"");
+ String hiveCommand = HiveManager.getInstance().getHiveCommand(this.hiveName);
+ buf.append(hiveCommand + " -e \"");
for (String statement : statements) {
buf.append(statement).append("\n");
}
buf.append("\"");
break;
case BEELINE:
+ if(this.hiveName != null) {
+ throw new IllegalArgumentException("Can not use external hive with beeline mode to build cube currently.");
+ }
BufferedWriter bw = null;
try {
File tmpHql = File.createTempFile("beeline_", ".hql");
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 520d7cc..2ddd381 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.data.HCatRecord;
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -52,6 +53,7 @@ import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.LookupDesc;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.hive.external.ExternalHiveClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,13 +87,17 @@ public class HiveMRInput implements IMRInput {
@Override
public void configureJob(Job job) {
- try {
- HCatInputFormat.setInput(job, dbName, tableName);
- job.setInputFormatClass(HCatInputFormat.class);
-
- job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ // HiveConf config file is static variable and ensure we set correct one.
+ synchronized(ExternalHiveClient.class) {
+ HiveConf.setHiveSiteLocation(Thread.currentThread().getContextClassLoader().getResource("hive-site.xml"));
+ try {
+ HCatInputFormat.setInput(job, dbName, tableName);
+ job.setInputFormatClass(HCatInputFormat.class);
+
+ job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
@@ -104,9 +110,9 @@ public class HiveMRInput implements IMRInput {
public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
- final JobEngineConfig conf;
- final IJoinedFlatTableDesc flatDesc;
- String hiveViewIntermediateTables = "";
+ protected final JobEngineConfig conf;
+ protected final IJoinedFlatTableDesc flatDesc;
+ protected String hiveViewIntermediateTables = "";
public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -126,7 +132,7 @@ public class HiveMRInput implements IMRInput {
jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
} else if ("2".equals(createFlatTableMethod)) {
// count from source table first, and then redistribute, suitable for partitioned table
- final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+ final String rowCountOutputDir = getRowCountOutputDir(conf, jobFlow.getId());
jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir));
} else {
@@ -139,16 +145,25 @@ public class HiveMRInput implements IMRInput {
}
}
- public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+ protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) {
+ return JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+ }
+
+ protected boolean isWriteToLocalDir() {
+ return false;
+ }
+
+ public AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
StringBuilder hiveInitBuf = new StringBuilder();
hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
- String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+ String rowCountOutputDir = getRowCountOutputDir(conf, jobId);
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+ step.setHiveName(flatTableDesc.getHiveName());
step.setInitStatement(hiveInitBuf.toString());
- step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir));
+ step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir()));
step.setRowCountOutputDir(rowCountOutputDir);
step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc));
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
@@ -157,13 +172,14 @@ public class HiveMRInput implements IMRInput {
}
- public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
+ public AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
final ShellExecutable step = new ShellExecutable();
-
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final String hiveName = flatTableDesc.getHiveName();
+
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
- hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
+ hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir()));
step.setCmd(hiveCmdBuilder.build());
step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE);
@@ -174,7 +190,7 @@ public class HiveMRInput implements IMRInput {
public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
- HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(flatDesc.getHiveName());
KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
@@ -221,6 +237,7 @@ public class HiveMRInput implements IMRInput {
String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute);
CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+ step.setHiveName(flatTableDesc.getHiveName());
step.setUseRedistribute(redistribute);
step.setInitStatement(hiveInitBuf.toString());
step.setRowCountOutputDir(rowCountOutputDir);
@@ -245,7 +262,7 @@ public class HiveMRInput implements IMRInput {
return new HiveTableInputFormat(getIntermediateTableIdentity());
}
- private String getIntermediateTableIdentity() {
+ protected String getIntermediateTableIdentity() {
return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName();
}
}
@@ -254,7 +271,8 @@ public class HiveMRInput implements IMRInput {
private final BufferedLogger stepLogger = new BufferedLogger(logger);
private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final String hiveName = getHiveName();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
hiveCmdBuilder.addStatement(getInitStatement());
hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
hiveCmdBuilder.addStatement(getSelectRowCountStatement());
@@ -282,7 +300,8 @@ public class HiveMRInput implements IMRInput {
}
private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
- final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+ final String hiveName = getHiveName();
+ final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
hiveCmdBuilder.addStatement(getInitStatement());
hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
@@ -369,6 +388,14 @@ public class HiveMRInput implements IMRInput {
public String getRowCountOutputDir() {
return getParam("rowCountOutputDir");
}
+
+ public void setHiveName(String hiveName) {
+ setParam("hiveName", hiveName);
+ }
+
+ public String getHiveName() {
+ return getParam("hiveName");
+ }
}
public static class GarbageCollectionStep extends AbstractExecutable {
@@ -390,7 +417,7 @@ public class HiveMRInput implements IMRInput {
return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
}
- private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+ protected String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
StringBuffer output = new StringBuffer();
final String hiveTable = this.getIntermediateTableIdentity();
if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
@@ -441,7 +468,7 @@ public class HiveMRInput implements IMRInput {
setParam("oldHiveTable", tableIdentity);
}
- private String getIntermediateTableIdentity() {
+ protected String getIntermediateTableIdentity() {
return getParam("oldHiveTable");
}
@@ -449,7 +476,7 @@ public class HiveMRInput implements IMRInput {
setParam("externalDataPath", externalDataPath);
}
- private String getExternalDataPath() {
+ protected String getExternalDataPath() {
return getParam("externalDataPath");
}
@@ -457,9 +484,17 @@ public class HiveMRInput implements IMRInput {
setParam("oldHiveViewIntermediateTables", tableIdentities);
}
- private String getHiveViewIntermediateTableIdentities() {
+ protected String getHiveViewIntermediateTableIdentities() {
return getParam("oldHiveViewIntermediateTables");
}
+
+ public String getCmd() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(" -").append("externalDataPath").append(" ").append(getIntermediateTableIdentity()).append(" -").append("externalDataPath").append(" ").
+ append(getExternalDataPath()).append(" -").append("oldHiveViewIntermediateTables").append(" ").append(getHiveViewIntermediateTableIdentities());
+
+ return buf.toString();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 70b097c..aceb791 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -33,6 +33,8 @@ import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +57,7 @@ public class HiveSourceTableLoader {
public static final String TABLE_FOLDER_NAME = "table";
public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
- public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+ public static Set<String> reloadHiveTables(String[] hiveTables, String project, KylinConfig config) throws IOException {
Map<String, Set<String>> db2tables = Maps.newHashMap();
for (String table : hiveTables) {
@@ -71,7 +73,7 @@ public class HiveSourceTableLoader {
// extract from hive
Set<String> loadedTables = Sets.newHashSet();
for (String database : db2tables.keySet()) {
- List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+ List<String> loaded = extractHiveTables(database, db2tables.get(database), project, config);
loadedTables.addAll(loaded);
}
@@ -84,13 +86,23 @@ public class HiveSourceTableLoader {
metaMgr.removeTableExd(hiveTable);
}
- private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+ private static List<String> extractHiveTables(String database, Set<String> tables, String project, KylinConfig config) throws IOException {
List<String> loadedTables = Lists.newArrayList();
MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+ ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+ String hiveName = projectMgr.getProject(project).getHive();
for (String tableName : tables) {
Table table = null;
- HiveClient hiveClient = new HiveClient();
+ HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
+ String tableIdentity = database + "." + tableName;
+ TableDesc tableDesc = metaMgr.getTableDesc(tableIdentity);
+ if(tableDesc != null && !HiveManager.isSameHiveSource(tableDesc.getHive(), hiveName)) {
+ throw new IllegalArgumentException("Table " + tableIdentity + " that in hive " +
+ hiveName + " has been loaded in hive " + tableDesc.getHive() +
+ ", please rename the table or unload the old one.");
+ }
+
List<FieldSchema> partitionFields = null;
List<FieldSchema> fields = null;
try {
@@ -108,13 +120,13 @@ public class HiveSourceTableLoader {
long tableSize = hiveClient.getFileSizeForTable(table);
long tableFileNum = hiveClient.getFileNumberForTable(table);
- TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
if (tableDesc == null) {
tableDesc = new TableDesc();
tableDesc.setDatabase(database.toUpperCase());
tableDesc.setName(tableName.toUpperCase());
tableDesc.setUuid(UUID.randomUUID().toString());
tableDesc.setLastModified(0);
+ tableDesc.setHive(hiveName);
}
if (table.getTableType() != null) {
tableDesc.setTableType(table.getTableType());
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index dcc43ff..9959ca9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.engine.mr.DFSFileTable;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,17 +37,19 @@ public class HiveTable implements ReadableTable {
final private String database;
final private String hiveTable;
+ final private String hiveName;
private HiveClient hiveClient;
public HiveTable(TableDesc tableDesc) {
+ this.hiveName = tableDesc.getHive();
this.database = tableDesc.getDatabase();
this.hiveTable = tableDesc.getName();
}
@Override
public TableReader getReader() throws IOException {
- return new HiveTableReader(database, hiveTable);
+ return new HiveTableReader(database, hiveTable, getHiveClient().getHiveConf());
}
@Override
@@ -86,7 +89,7 @@ public class HiveTable implements ReadableTable {
public HiveClient getHiveClient() {
if (hiveClient == null) {
- hiveClient = new HiveClient();
+ hiveClient = HiveManager.getInstance().createHiveClient(this.hiveName);
}
return hiveClient;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 3f9ce01..f6db33a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -48,6 +48,7 @@ public class HiveTableReader implements TableReader {
private HCatRecord currentHCatRecord;
private int numberOfSplits = 0;
private Map<String, String> partitionKV = null;
+ private HiveConf hiveConf = null;
/**
* Constructor for reading whole hive table
@@ -55,8 +56,8 @@ public class HiveTableReader implements TableReader {
* @param tableName
* @throws IOException
*/
- public HiveTableReader(String dbName, String tableName) throws IOException {
- this(dbName, tableName, null);
+ public HiveTableReader(String dbName, String tableName, HiveConf hiveConf) throws IOException {
+ this(dbName, tableName, null, hiveConf);
}
/**
@@ -66,16 +67,17 @@ public class HiveTableReader implements TableReader {
* @param partitionKV key-value pairs condition on the partition
* @throws IOException
*/
- public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
+ public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV, HiveConf hiveConf) throws IOException {
this.dbName = dbName;
this.tableName = tableName;
this.partitionKV = partitionKV;
+ this.hiveConf = hiveConf;
initialize();
}
private void initialize() throws IOException {
try {
- this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
+ this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV, this.hiveConf);
} catch (Exception e) {
e.printStackTrace();
throw new IOException(e);
@@ -146,8 +148,10 @@ public class HiveTableReader implements TableReader {
return "hive table reader for: " + dbName + "." + tableName;
}
- private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
- HiveConf hiveConf = new HiveConf(HiveTableReader.class);
+ private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV, HiveConf hiveConf)
+ throws Exception {
+ if(hiveConf == null)
+ hiveConf = new HiveConf(HiveTableReader.class);
Iterator<Entry<String, String>> itr = hiveConf.iterator();
Map<String, String> map = new HashMap<String, String>();
while (itr.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
index 79493a4..08fbcd6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.hive.external.HiveManager;
import org.datanucleus.store.types.backed.HashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public class HqlExecutable extends AbstractExecutable {
private static final Logger logger = LoggerFactory.getLogger(HqlExecutable.class);
private static final String HQL = "hql";
+ private static final String HIVE_NAME = "hive-name";
private static final String HIVE_CONFIG = "hive-config";
public HqlExecutable() {
@@ -52,7 +54,8 @@ public class HqlExecutable extends AbstractExecutable {
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
try {
Map<String, String> configMap = getConfiguration();
- HiveClient hiveClient = new HiveClient(configMap);
+ String hiveName = getHiveName();
+ HiveClient hiveClient = HiveManager.getInstance().createHiveClientWithConfig(configMap, hiveName);
for (String hql : getHqls()) {
hiveClient.executeHQL(hql);
@@ -104,4 +107,11 @@ public class HqlExecutable extends AbstractExecutable {
}
}
+ public void setHiveName(String hiveName) {
+ setParam(HIVE_NAME, hiveName);
+ }
+
+ private String getHiveName() {
+ return getParam(HIVE_NAME);
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
new file mode 100644
index 0000000..3a488d3
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistcpShellExecutable extends ShellExecutable {
+ private static final Logger logger = LoggerFactory.getLogger(DistcpShellExecutable.class);
+
+ private static final String HIVE_NAME = "hiveName";
+ private static final String TABLE_NAME = "tableName";
+ private static final String OUTPUT_PATH = "output";
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ String tableName = getTableName();
+ String hiveName = getHiveName();
+ String input = null;
+ String database = context.getConfig().getHiveDatabaseForIntermediateTable();
+ try {
+ if(hiveName == null) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "The job is using default hive, do not need copy data!");
+ }
+ input = HiveManager.getInstance().getHiveTableLocation(database, tableName, hiveName);
+ } catch (Exception e) {
+ logger.error("Failed to get location of hive table {}.{}, using hive name {}", database, tableName, hiveName);
+ return new ExecuteResult(ExecuteResult.State.ERROR , e.getLocalizedMessage());
+ }
+ String output = getOutputPath();
+ logger.info("Copy Intermediate Hive Table input : {} , output : {}", input, output);
+ /**
+ * Copy hive table only when source hive table location is in different hadoop cluster.
+ */
+ if(input.startsWith("/") || input.startsWith(HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY))) {
+ return new ExecuteResult(ExecuteResult.State.SUCCEED, "Hive " + hiveName + " is based on default hadoop cluster, skip copy data .");
+ } else {
+ Path inputPath = new Path(input);
+ input = inputPath.toString();
+ }
+ String cmd = String.format("hadoop distcp -overwrite %s %s", input, output);
+ super.setCmd(cmd);
+
+ return super.doWork(context);
+ }
+
+ public void setHiveName(String name) {
+ setParam(HIVE_NAME, name);
+ }
+
+ public void setTableName(String name) {
+ setParam(TABLE_NAME, name);
+ }
+
+ public void setOutputPath(String output) {
+ setParam(OUTPUT_PATH, output);
+ }
+
+ public String getOutputPath() {
+ return getParam(OUTPUT_PATH);
+ }
+
+ public String getHiveName() {
+ return getParam(HIVE_NAME);
+ }
+
+ public String getTableName() {
+ return getParam(TABLE_NAME);
+ }
+
+ public String getExecCmd() {
+ StringBuffer buf = new StringBuffer();
+ buf.append(" -").append(HIVE_NAME).append(" ").append(getHiveName()).append(" -").append(TABLE_NAME).append(" ").
+ append(getTableName()).append(" -").append(OUTPUT_PATH).append(" ").append(getOutputPath());
+
+ return buf.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
new file mode 100644
index 0000000..b7d1559
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.kylin.source.hive.HiveClient;
+
+/**
+ * Hive meta API client for external hive
+ * @author hzfengyu
+ */
+public class ExternalHiveClient extends HiveClient {
+ private final static String LOCAL_FS_SCHEMA = "file://";
+
+ public ExternalHiveClient(String location) {
+ URL uri = null;
+ if(location != null) {
+ try {
+ uri = new URL(LOCAL_FS_SCHEMA + location);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Can not find hive config file " + location);
+ }
+ } else {
+ uri = Thread.currentThread().getContextClassLoader().getResource("hive-site.xml");
+ }
+
+ /**
+ * In HiveConf, hiveSiteURL is a static variable, so we should use a global lock.
+ * If uri is null, HiveConf will use the file from java classpath.
+ */
+ synchronized(ExternalHiveClient.class) {
+ hiveConf.setHiveSiteLocation(uri);
+ hiveConf = new HiveConf(HiveClient.class);
+ }
+ }
+
+ public ExternalHiveClient(Map<String, String> configMap, String location) {
+ this(location);
+ appendConfiguration(configMap);
+ }
+
+ @Override
+ protected HiveMetaStoreClient getMetaStoreClient() throws Exception {
+ /**
+ * HMSHandler is a LocalThread variable, in tomcat we should check it.
+ * When to remove hive meta store client in thread local variable:
+ * 1: when create new hive client. but HMSHandler exist in current thread.
+ * 2: when change hive client in current thread
+ */
+ if (metaStoreClient == null) {
+ HiveMetaStore.HMSHandler.removeRawStore();
+ metaStoreClient = new HiveMetaStoreClient(hiveConf);
+ } else if(HiveMetaStore.HMSHandler.getRawStore() != null && HiveMetaStore.HMSHandler.getRawStore().getConf() != this.hiveConf) {
+ HiveMetaStore.HMSHandler.getRawStore().shutdown();
+ HiveMetaStore.HMSHandler.getRawStore().setConf(this.hiveConf);
+ }
+ return metaStoreClient;
+ }
+}