You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/11/09 06:14:01 UTC

[1/2] kylin git commit: KYLIN-1826, add external hive interface, project, table.. Signed-off-by: terry-chelsea [Forced Update!]

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-1826 726fb8fd6 -> 417fed32f (forced update)


http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
new file mode 100644
index 0000000..ef7b54d
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
+import org.apache.kylin.source.hive.HiveCmdBuilder;
+import org.apache.kylin.source.hive.HiveMRInput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** 
+ * External hive file mr input
+ * @author hzfengyu
+ */
+public class ExternalHiveMRInput extends HiveMRInput {
+    @Override
+    public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+        return new BatchFileCubingInputSide(flatDesc);
+    }
+    
+    public static class BatchFileCubingInputSide extends BatchCubingInputSide {
+        private static final Logger logger = LoggerFactory.getLogger(BatchFileCubingInputSide.class);
+
+        public BatchFileCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) {
+            super.addStepPhase1_CreateFlatTable(jobFlow);
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+
+            /**
+             * Create table as flat hive table in default hive the same as external hive table, use it in next steps.
+             * Do not create view in default table. because kylin read lookup table file.
+             */
+            jobFlow.addTask(createFlatTableInDefaultHive(conf, flatDesc, jobFlow.getId(), cubeName));
+            AbstractExecutable copyDataStep = createCopyHiveDataStep(flatDesc.getTableName(), flatDesc.getHiveName(),
+                    JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            if(copyDataStep != null) {
+                jobFlow.addTask(copyDataStep);
+            }
+        }
+        
+        @Override
+        protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) {
+            String tempDir = System.getProperty("java.io.tmpdir", "/tmp");
+            return String.format("file://%s/kylin-%s/%s", tempDir, jobId, "/row_count");
+        }
+        
+        @Override
+        protected boolean isWriteToLocalDir() {
+            return true;
+        }
+        
+        protected AbstractExecutable createCopyHiveDataStep(String flatHiveTableName, String hiveName, String output) {
+            DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable();
+            copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_HIVE_DATA);
+            copyHiveTableSetp.setHiveName(hiveName);
+            copyHiveTableSetp.setOutputPath(output);
+            copyHiveTableSetp.setTableName(flatHiveTableName);
+
+            return copyHiveTableSetp;
+        }
+        
+        protected AbstractExecutable createFlatTableInDefaultHive(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+            StringBuilder hiveInitBuf = new StringBuilder();
+            hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
+
+            final String useDatabaseHql = "USE " + conf.getConfig().getHiveDatabaseForIntermediateTable() + ";\n";
+            final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatTableDesc);
+            final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatTableDesc, JobBuilderSupport.getJobWorkingDir(conf, jobId));
+
+            CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setHiveName(null);
+            step.setUseRedistribute(false);
+            step.setInitStatement(hiveInitBuf.toString());
+            step.setRowCountOutputDir(null);
+            step.setCreateTableStatement(useDatabaseHql + dropTableHql + createTableHql);
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT);
+            return step;
+        }
+        
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            String hiveName = flatDesc.getHiveName();
+            
+            ExternalGarbageCollectionStep step = new ExternalGarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+            step.setIntermediateTableIdentity(getIntermediateTableIdentity());
+            step.setExternalDataPath(JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId())));
+            step.setHiveViewIntermediateTableIdentities(hiveViewIntermediateTables);
+            step.setHiveName(hiveName);
+            jobFlow.addTask(step);
+        }
+        
+        public static class ExternalGarbageCollectionStep extends GarbageCollectionStep {
+            @Override
+            protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+                KylinConfig config = context.getConfig();
+                StringBuffer output = new StringBuffer();
+                try {
+                    output.append(super.cleanUpIntermediateFlatTable(config));
+                    output.append(cleanUpExternalHiveFlatTable(config));
+                    // don't drop view to avoid concurrent issue
+                    //output.append(cleanUpHiveViewIntermediateTable(config));
+                } catch (IOException e) {
+                    logger.error("job:" + getId() + " execute finished with exception", e);
+                    return new ExecuteResult(ExecuteResult.State.ERROR, e.getMessage());
+                }
+
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
+            }
+            
+            public String cleanUpExternalHiveFlatTable(KylinConfig config) throws IOException {
+                StringBuffer output = new StringBuffer();
+                String hiveName = this.getHiveName();
+                if(hiveName == null)
+                    return output.toString();
+                final String hiveTable = getIntermediateTableIdentity();
+                if (StringUtils.isNotEmpty(hiveTable)) {
+                    final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
+                    hiveCmdBuilder.addStatement("USE " + config.getHiveDatabaseForIntermediateTable() + ";");
+                    hiveCmdBuilder.addStatement("DROP TABLE IF EXISTS  " + hiveTable + ";");
+                    
+                    //remove table location first, otherwise table will not be found.
+                    String rmExternalOutput = rmExternalTableDirOnHDFS(hiveTable, hiveName);
+                    config.getCliCommandExecutor().execute(hiveCmdBuilder.build());
+                    output.append("Hive table " + hiveTable + " is dropped. \n");
+                    output.append(rmExternalOutput);
+                }
+                return output.toString();
+            }
+            
+            private String rmExternalTableDirOnHDFS(String tableName, String hive) throws IOException {
+                try {
+                    String dir = HiveManager.getInstance().getHiveTableLocation(tableName, hive);
+                    Path path = new Path(dir);
+                    FileSystem fs = HadoopUtil.getFileSystem(dir);
+                    if(fs.exists(path)) {
+                        fs.delete(path, true);
+                    }
+                    return "Remove External Hive " + hive + " Table " + tableName + ", location " + path.toString() + "\n";
+                } catch (Exception e) {
+                    logger.warn("Get table localtion failed ! table {}, hive name {}.", tableName, hive , e);
+                    return "Fetch external table location or delete path failed. skip it.";
+                }
+            }
+            
+            public void setHiveName(String hiveName) {
+                setParam("hiveName", hiveName);
+            }
+            
+            public String getHiveName() {
+                return getParam("hiveName");
+            }
+            
+            @Override
+            public String getCmd() {
+                StringBuffer sb = new StringBuffer(super.getCmd());
+                sb.append(" -").append("hiveName").append(" ").append(this.getHiveName());
+                return sb.toString();
+            }
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
new file mode 100644
index 0000000..b1e8101
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveSource.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/ 
+package org.apache.kylin.source.hive.external;
+ 
+ import java.util.List;
+
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.ISource;
+import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.hive.HiveTable;
+
+import com.google.common.collect.Lists;
+ 
+ public class ExternalHiveSource implements ISource {
+ 
+     @Override
+     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
+         if (engineInterface == IMRInput.class) {
+             return (I) new ExternalHiveMRInput();
+         } else {
+             throw new RuntimeException("Cannot adapt to " + engineInterface);
+         }
+     }
+ 
+     @Override
+     public ReadableTable createReadableTable(TableDesc tableDesc) {
+         return new HiveTable(tableDesc);
+     }
+
+    @Override
+    public List<String> getMRDependentResources(TableDesc table) {
+        return Lists.newArrayList();
+    }
+ 
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
new file mode 100644
index 0000000..cc2ad8b
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/HiveManager.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.io.File;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.source.hive.HiveClient;
+import org.apache.log4j.Logger;
+
+
+/** 
+ * Manager all hive client, default client and external clients.
+ * This is a Singleton object.
+ * @author hzfengyu
+ */
+public class HiveManager {
+    private volatile static HiveManager HIVE_MANAGER_CACHE = null;
+    private static final Logger logger = Logger.getLogger(HiveManager.class);
+    private static final String DEFAULT_HIVE_NAME = "default(null)";
+    private static final String HIVE_CONFIG_FILE_LOCATION = "conf/hive-site.xml";
+    private static final String HIVE_COMMAND_LOCATION = "bin/hive";
+    public static final String NOT_SUPPORT_ERROR_MESSAGE = 
+            "Do not support external hive, set kylin.external.hive.root.directory to support it";
+    
+    //hive root directory, if this equals to null or empty meaning just use default hive
+    private File externalHiveRootDir = null;
+    private ConcurrentHashMap<String, HiveClient> externalHiveMap = null;
+    private HiveClient defaultHiveClient = null;
+    
+    public static HiveManager getInstance() {
+        //using default kylin config
+        if(HIVE_MANAGER_CACHE == null) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            synchronized(HiveManager.class) {
+                if(HIVE_MANAGER_CACHE == null) {
+                    HIVE_MANAGER_CACHE = new HiveManager(config);
+                }
+            }
+        }
+        return HIVE_MANAGER_CACHE;
+    }
+    
+    private HiveManager(KylinConfig config) {
+        String externalRootDir = config.getExternalHiveRootDirectory();
+        if(externalRootDir != null && !externalRootDir.isEmpty()) {
+            File file = new File(externalRootDir);
+            //check to ensure hive root file exist
+            this.externalHiveRootDir = file;
+            if(!(file.exists() && file.isDirectory())) {
+                logger.warn("Hive root directory " + file.getAbsolutePath() + " do not exist !");
+                this.externalHiveRootDir = null;
+            }
+        } else {
+            this.externalHiveRootDir = null;
+        }
+        this.externalHiveMap = new ConcurrentHashMap<String, HiveClient>();
+    }
+    
+    public List<String> getExternalHiveName() {
+        List<String> hiveNames = new LinkedList<String>();
+        hiveNames.add(DEFAULT_HIVE_NAME);
+        if(this.externalHiveRootDir == null)
+            return hiveNames;
+        
+        //take every diectory in hive root dir is a hive source. take directory name as hive name
+        for(File file : this.externalHiveRootDir.listFiles()) {
+            if(!file.isDirectory()) {
+                logger.warn("File " + file.getAbsolutePath() + " in hive root directory is normal file.");
+                continue;
+            }
+            hiveNames.add(file.getName());
+        }
+        return hiveNames;
+    }
+    
+    private void checkIsSupportExternalHive() {
+        if(this.externalHiveRootDir == null) {
+            throw new IllegalArgumentException(NOT_SUPPORT_ERROR_MESSAGE);
+        }
+    }
+    
+    public String getHiveConfigFile(String hiveName) {
+        if(hiveName == null) {
+            return null;
+        }
+        checkIsSupportExternalHive();
+     
+        File hiveRootFile = new File(this.externalHiveRootDir, hiveName);
+        if(!(hiveRootFile.exists() && hiveRootFile.isDirectory())) {
+            throw new IllegalArgumentException("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist.");
+        }
+        
+        File hiveConfigFile = new File(hiveRootFile, HIVE_CONFIG_FILE_LOCATION);
+        if(!(hiveConfigFile.exists() && hiveConfigFile.isFile())) {
+            throw new IllegalArgumentException("Hive " + hiveName + " config file " + hiveConfigFile.getAbsolutePath() + " do not exist.");
+        }
+        return hiveConfigFile.getAbsolutePath();
+    }
+    
+    public HiveClient createHiveClient(String hiveName) {
+        //use internal hive client while do not appoint a hive
+        if(hiveName == null) { 
+            if(defaultHiveClient == null)
+                defaultHiveClient = new ExternalHiveClient(null);
+            return this.defaultHiveClient;
+        }
+        checkIsSupportExternalHive();
+        
+        HiveClient client = this.externalHiveMap.get(hiveName);
+        if(client != null)
+            return client;
+        String configFileLocation = getHiveConfigFile(hiveName);
+        if(configFileLocation == null) {
+            throw new IllegalArgumentException("Can not find hive " + hiveName + " config file in external hive root directory " +
+                    this.externalHiveRootDir.getAbsolutePath());
+        }
+        
+        try {
+            client = new ExternalHiveClient(configFileLocation);
+            this.externalHiveMap.put(hiveName, client);
+        } catch (Exception e) {
+            throw new IllegalArgumentException("Can not create hive client for " + hiveName + ", config file " + configFileLocation);
+        }
+        return client;
+    }
+    
+    public HiveClient createHiveClientWithConfig(Map<String, String> configMap, String hiveName) {
+        HiveClient client = this.createHiveClient(hiveName);
+        client.appendConfiguration(configMap);
+        return client;
+    }
+    
+    public String getHiveCommand(String hiveName) {
+        if(hiveName == null) {
+            return "hive";
+        }
+        checkIsSupportExternalHive();
+        
+        File hiveRootFile = new File(this.externalHiveRootDir, hiveName);
+        if(!(hiveRootFile.exists() && hiveRootFile.isDirectory())) {
+            throw new IllegalArgumentException("Hive " + hiveName + " root directory " + hiveRootFile.getAbsolutePath() + " do not exist.");
+        }
+        
+        File hiveCmdFile = new File(hiveRootFile, HIVE_COMMAND_LOCATION);
+        if(!(hiveCmdFile.exists() && hiveCmdFile.isFile())) {
+            throw new IllegalArgumentException("Hive " + hiveName + " bin file " + hiveCmdFile.getAbsolutePath() + " do not exist.");
+        }
+        return hiveCmdFile.getAbsolutePath();
+    }
+    
+    public String getHiveTableLocation(String database, String tableName, String hiveName) throws Exception {
+        HiveClient hiveClient = this.createHiveClient(hiveName);
+        try {
+            String tableLocation = hiveClient.getHiveTableLocation(database, tableName);
+            return tableLocation;
+        } catch (Exception e) {
+           logger.error("Get hive " + hiveName + " table " + tableName + " location error !");
+           throw e;
+        }
+    }
+    
+    public String getHiveTableLocation(String fullTableName, String hiveName) throws Exception {
+        String[] tables = HadoopUtil.parseHiveTableName(fullTableName);
+        String database = tables[0];
+        String tableName = tables[1];
+        return getHiveTableLocation(database, tableName, hiveName);
+    }
+    
+    public static void clearCache() {
+        HIVE_MANAGER_CACHE = null;
+        getInstance();
+    }
+    
+    public boolean isSupportExternalHives() {
+        return this.externalHiveRootDir != null;
+    }
+    
+    public static boolean isSameHiveSource(String first, String second) {
+        if(first == null) {
+            return second == null;
+        } else {
+            return first.equalsIgnoreCase(second);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/controllers/page.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/page.js b/webapp/app/js/controllers/page.js
index c65a264..c07cf70 100644
--- a/webapp/app/js/controllers/page.js
+++ b/webapp/app/js/controllers/page.js
@@ -215,6 +215,7 @@ var projCtrl = function ($scope, $location, $modalInstance, ProjectService, Mess
       var requestBody = {
         formerProjectName: $scope.state.oldProjName,
         newProjectName: $scope.proj.name,
+        newHiveName: $scope.proj.hive,
         newDescription: $scope.proj.description
       };
       ProjectService.update({}, requestBody, function (newProj) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index e3ab0ac..1c08aa0 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -177,7 +177,7 @@ KylinApp
       $scope.loadHive = function () {
         if($scope.hiveLoaded)
           return;
-        TableService.showHiveDatabases({}, function (databases) {
+        TableService.showHiveDatabases({project:$scope.projectName}, function (databases) {
           $scope.dbNum = databases.length;
           if (databases.length > 0) {
             $scope.hiveMap = {};
@@ -293,7 +293,7 @@ KylinApp
 
       $scope.showToggle = function(node) {
         if(node.expanded == false){
-          TableService.showHiveTables({"database": node.label},function (hive_tables){
+        	TableService.showHiveTables({"database": node.label, project:$scope.projectName},function (hive_tables){
             var tables = [];
             for (var i = 0; i < hive_tables.length; i++) {
               tables.push(hive_tables[i]);

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/js/model/projectConfig.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/model/projectConfig.js b/webapp/app/js/model/projectConfig.js
index 270eb7a..2a84b47 100644
--- a/webapp/app/js/model/projectConfig.js
+++ b/webapp/app/js/model/projectConfig.js
@@ -20,6 +20,7 @@ KylinApp.constant('projectConfig', {
   theaditems: [
     {attr: 'name', name: 'Name'},
     {attr: 'owner', name: 'Owner'},
+    {attr: 'hive', name: 'Hive Name'},
     {attr: 'description', name: 'Description'},
     {attr: 'create_time', name: 'Create Time'}
   ]

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/partials/projects/project_create.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/projects/project_create.html b/webapp/app/partials/projects/project_create.html
index 36cbaf5..cd150b1 100644
--- a/webapp/app/partials/projects/project_create.html
+++ b/webapp/app/partials/projects/project_create.html
@@ -36,6 +36,15 @@
                 </div>
             </div>
             <div class="form-group">
+                <label><b>Hive Name</b></label>
+ 
+                <div class="clearfix">
+                     <input name="description_input" type="text" class="form-control"
+                               placeholder="External hive name that the project take as input source, ignore it if you are using default hive."
+                               ng-model="proj.hive"></textarea>
+                </div>
+            </div>
+            <div class="form-group">
                 <label><b>Project Description</b></label>
 
                 <div class="clearfix">

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/webapp/app/partials/projects/projects.html
----------------------------------------------------------------------
diff --git a/webapp/app/partials/projects/projects.html b/webapp/app/partials/projects/projects.html
index 96e4a91..10b4dba 100644
--- a/webapp/app/partials/projects/projects.html
+++ b/webapp/app/partials/projects/projects.html
@@ -54,6 +54,7 @@
                     {{ project.name}}
                 </td>
                 <td>{{ project.owner}}</td>
+                <td>{{ project.hive}}</td>
                 <td>{{ project.description}}</td>
                 <td>{{ project.create_time_utc | utcToConfigTimeZone}}</td>
                 <td>


[2/2] kylin git commit: KYLIN-1826, add external hive interface, project, table.. Signed-off-by: terry-chelsea

Posted by sh...@apache.org.
KYLIN-1826, add external hive interface, project, table..
Signed-off-by: terry-chelsea <hz...@corp.netease.com>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/417fed32
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/417fed32
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/417fed32

Branch: refs/heads/KYLIN-1826
Commit: 417fed32f7abbea459906f5be148c974659effd4
Parents: b08871e
Author: terry-chelsea <hz...@corp.netease.com>
Authored: Fri Oct 28 20:38:35 2016 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 9 14:11:46 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   5 +
 .../cube/model/CubeJoinedFlatTableDesc.java     |   5 +
 .../cube/model/CubeJoinedFlatTableEnrich.java   |   4 +
 .../org/apache/kylin/job/JoinedFlatTable.java   |  10 +-
 .../kylin/job/constant/ExecutableConstants.java |   2 +
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 +
 .../kylin/metadata/model/ISourceAware.java      |   1 +
 .../apache/kylin/metadata/model/TableDesc.java  |  15 +-
 .../kylin/metadata/project/ProjectInstance.java |  11 +
 .../kylin/metadata/project/ProjectManager.java  |  22 +-
 .../hive/ITHiveSourceTableLoaderTest.java       |   3 +-
 .../source/hive/ITHiveTableReaderTest.java      |   2 +-
 .../kylin/rest/controller/CubeController.java   |  17 ++
 .../rest/controller/ProjectController.java      |  35 +++-
 .../rest/controller/StreamingController.java    |   4 +
 .../kylin/rest/controller/TableController.java  |  24 ++-
 .../rest/request/CreateProjectRequest.java      |  12 ++
 .../rest/request/UpdateProjectRequest.java      |  13 ++
 .../kylin/rest/response/TableDescResponse.java  |   1 +
 .../apache/kylin/rest/service/BasicService.java |  14 +-
 .../apache/kylin/rest/service/CacheService.java |   4 +-
 .../apache/kylin/rest/service/CubeService.java  |  10 +-
 .../kylin/rest/service/ProjectService.java      |   6 +-
 .../source/hive/CreateFlatHiveTableStep.java    |  11 +-
 .../apache/kylin/source/hive/HiveClient.java    |   2 +-
 .../kylin/source/hive/HiveCmdBuilder.java       |  13 +-
 .../apache/kylin/source/hive/HiveMRInput.java   |  87 +++++---
 .../source/hive/HiveSourceTableLoader.java      |  22 +-
 .../org/apache/kylin/source/hive/HiveTable.java |   7 +-
 .../kylin/source/hive/HiveTableReader.java      |  16 +-
 .../apache/kylin/source/hive/HqlExecutable.java |  12 +-
 .../hive/external/DistcpShellExecutable.java    | 100 +++++++++
 .../hive/external/ExternalHiveClient.java       |  80 +++++++
 .../hive/external/ExternalHiveMRInput.java      | 200 ++++++++++++++++++
 .../hive/external/ExternalHiveSource.java       |  51 +++++
 .../kylin/source/hive/external/HiveManager.java | 206 +++++++++++++++++++
 webapp/app/js/controllers/page.js               |   1 +
 webapp/app/js/controllers/sourceMeta.js         |   4 +-
 webapp/app/js/model/projectConfig.js            |   1 +
 .../app/partials/projects/project_create.html   |   9 +
 webapp/app/partials/projects/projects.html      |   1 +
 41 files changed, 975 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 79ee084..39c4a3e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -365,6 +365,10 @@ abstract public class KylinConfigBase implements Serializable {
     public String getCliWorkingDir() {
         return getOptional("kylin.job.remote.cli.working.dir");
     }
+    
+    public String getExternalHiveRootDirectory() {
+        return getOptional("kylin.external.hive.root.directory", null);
+    }
 
     public boolean isEmptySegmentAllowed() {
         return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true"));
@@ -717,6 +721,7 @@ abstract public class KylinConfigBase implements Serializable {
         Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine."));
         // ref constants in ISourceAware
         r.put(0, "org.apache.kylin.source.hive.HiveSource");
+        r.put(6, "org.apache.kylin.source.hive.external.ExternalHiveSource");
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
index 6aeb617..1e62ac9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java
@@ -161,5 +161,10 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
     public TblColRef getDistributedBy() {
         return cubeDesc.getDistributedByColumn();
     }
+    
+    @Override
+    public String getHiveName() {
+        return this.getDataModel().getFactTableDesc().getHive();
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
index 5212859..566490e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableEnrich.java
@@ -137,4 +137,8 @@ public class CubeJoinedFlatTableEnrich implements IJoinedFlatTableDesc {
         return flatDesc.getDistributedBy();
     }
 
+    @Override
+    public String getHiveName() {
+        return this.getDataModel().getFactTableDesc().getHive();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 699d084..7f73393 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -141,12 +141,13 @@ public class JoinedFlatTable {
         return sql.toString();
     }
 
-    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir) {
+    public static String generateCountDataStatement(IJoinedFlatTableDesc flatDesc, final String outputDir, boolean local) {
         final Map<String, String> tableAliasMap = buildTableAliasMap(flatDesc.getDataModel());
         final StringBuilder sql = new StringBuilder();
+        String localStr = local ? "LOCAL" : "";
         final String factTbl = flatDesc.getDataModel().getFactTable();
         sql.append("dfs -mkdir -p " + outputDir + ";\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
+        sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
         appendWhereStatement(flatDesc, sql, tableAliasMap);
         return sql.toString();
     }
@@ -285,10 +286,11 @@ public class JoinedFlatTable {
         return hiveDataType.toLowerCase();
     }
 
-    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) {
+    public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir, boolean local) {
         StringBuilder sql = new StringBuilder();
+        String localStr = local ? "LOCAL" : "";
         sql.append("set hive.exec.compress.output=false;\n");
-        sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
+        sql.append("INSERT OVERWRITE " + localStr + " DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n");
         return sql.toString();
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 893c034..5cac792 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -36,6 +36,8 @@ public final class ExecutableConstants {
 
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
+    public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE_IN_DEFAULT = "Create Intermediate Flat Hive Table in Default Hive";
+    public static final String STEP_NAME_COPY_HIVE_DATA = "Copy Intermediate Table To Local Hadoop";
     public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
     public static final String STEP_NAME_COUNT_HIVE_TABLE = "Count Source Table";
     public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
index f3a4107..71c015a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/IJoinedFlatTableDesc.java
@@ -37,4 +37,6 @@ public interface IJoinedFlatTableDesc {
     long getSourceOffsetEnd();
     
     TblColRef getDistributedBy();
+    // Determine hive name by data model fact table.
+    String getHiveName();
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
index 0f98d5d..b4030dd 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ISourceAware.java
@@ -23,6 +23,7 @@ public interface ISourceAware {
     public static final int ID_HIVE = 0;
     public static final int ID_STREAMING = 1;
     public static final int ID_SPARKSQL = 5;
+    public static final int ID_EXTERNAL_HIVE = 6;
     public static final int ID_EXTERNAL = 7;
 
     int getSourceType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index e163d1d..f647d59 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -45,6 +45,8 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
     private int sourceType = ISourceAware.ID_HIVE;
     @JsonProperty("table_type")
     private String tableType;
+    @JsonProperty("hive")
+    private String hive;
 
     private static final String materializedTableNamePrefix = "kylin_intermediate_";
 
@@ -60,6 +62,7 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         this.columns = other.getColumns();
         this.database.setName(other.getDatabase());
         this.tableType = other.getTableType();
+        this.hive = other.getHive();
     }
 
     public ColumnDesc findColumnByName(String name) {
@@ -235,7 +238,9 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
 
     @Override
     public int getSourceType() {
-        return sourceType;
+        if(!(ISourceAware.ID_HIVE == sourceType || ISourceAware.ID_EXTERNAL_HIVE == sourceType))
+            return this.sourceType;
+        return hive == null ? ISourceAware.ID_HIVE : ISourceAware.ID_EXTERNAL_HIVE;
     }
 
     public void setSourceType(int sourceType) {
@@ -250,4 +255,12 @@ public class TableDesc extends RootPersistentEntity implements ISourceAware {
         this.tableType = tableType;
     }
 
+    public String getHive() {
+        return hive;
+    }
+
+    public void setHive(String hive) {
+        this.hive = hive;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
index 1afc603..b0ce889 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java
@@ -49,6 +49,9 @@ public class ProjectInstance extends RootPersistentEntity {
 
     @JsonProperty("name")
     private String name;
+    
+    @JsonProperty("hive")
+    private String hive;
 
     @JsonProperty("tables")
     private Set<String> tables = new TreeSet<String>();
@@ -153,6 +156,14 @@ public class ProjectInstance extends RootPersistentEntity {
     public void setName(String name) {
         this.name = name;
     }
+    
+    public String getHive() {
+        return hive;
+    }
+
+    public void setHive(String hive) {
+        this.hive = hive;
+    }
 
     public boolean containsRealization(final RealizationType type, final String realization) {
         return Iterables.any(this.realizationEntries, new Predicate<RealizationEntry>() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
index 1bf9804..e86c1a2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectManager.java
@@ -152,6 +152,15 @@ public class ProjectManager {
 
         return currentProject;
     }
+    
+    public ProjectInstance createProject(String projectName, String owner, String description, String hiveName) throws IOException {
+        logger.info("Creating project " + projectName + ", hive name " + hiveName);
+        ProjectInstance currentProject = this.createProject(projectName, owner, description);
+        currentProject.setHive(hiveName);
+        updateProject(currentProject);
+        
+        return currentProject;
+    }
 
     public ProjectInstance dropProject(String projectName) throws IOException {
         if (projectName == null)
@@ -183,9 +192,9 @@ public class ProjectManager {
     }
 
     //update project itself
-    public ProjectInstance updateProject(ProjectInstance project, String newName, String newDesc) throws IOException {
+    public ProjectInstance updateProject(ProjectInstance project, String newName, String newDesc, String hiveName) throws IOException {
         if (!project.getName().equals(newName)) {
-            ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc);
+            ProjectInstance newProject = this.createProject(newName, project.getOwner(), newDesc, hiveName);
 
             newProject.setCreateTimeUTC(project.getCreateTimeUTC());
             newProject.recordUpdateTime(System.currentTimeMillis());
@@ -200,8 +209,13 @@ public class ProjectManager {
             return newProject;
         } else {
             project.setName(newName);
-            project.setDescription(newDesc);
-
+            if(newDesc != null) {
+                project.setDescription(newDesc);
+            }
+            if(hiveName != null) {
+                project.setHive(hiveName);
+            }
+            
             if (project.getUuid() == null)
                 project.updateRandomUuid();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index c4f0777..c33840c 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -45,7 +45,8 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
     public void test() throws IOException {
         KylinConfig config = getTestConfig();
         String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+        String project = "learn_kylin";
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, project, config);
 
         assertTrue(loaded.size() == toLoad.length);
         for (String str : toLoad)

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
index 757888e..53906dc 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveTableReaderTest.java
@@ -34,7 +34,7 @@ public class ITHiveTableReaderTest extends HBaseMetadataTestCase {
 
     @Test
     public void test() throws IOException {
-        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact");
+        HiveTableReader reader = new HiveTableReader("default", "test_kylin_fact", null);
         int rowNumber = 0;
         while (reader.next()) {
             String[] row = reader.getRow();

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index 5397df7..d371230 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -56,6 +56,7 @@ import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.JobService;
 import org.apache.kylin.rest.service.KafkaConfigService;
 import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.slf4j.Logger;
@@ -343,6 +344,9 @@ public class CubeController extends BasicController {
         String newCubeName = cubeRequest.getCubeName();
         String project = cubeRequest.getProject();
 
+        if(!this.isBasedSameSource(cubeName, project)) {
+            throw new InternalErrorException("Can not move cube to project with different hive source!");
+        }
         CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
         if (cube == null) {
             throw new InternalErrorException("Cannot find cube " + cubeName);
@@ -364,6 +368,19 @@ public class CubeController extends BasicController {
         return newCube;
 
     }
+    
+    private boolean isBasedSameSource(String cubeName, String projectName) {
+        CubeInstance cube = cubeService.getCubeManager().getCube(cubeName);
+        if(cube == null)
+            return true;
+        
+        ProjectInstance project = cubeService.getProjectManager().getProject(projectName);
+        if(project == null) {
+            return true;
+        }
+        ProjectInstance cubeProject = cubeService.getProjectByCube(cubeName);
+        return HiveManager.isSameHiveSource(project.getHive(), cubeProject.getHive());
+    }
 
     @RequestMapping(value = "/{cubeName}/enable", method = { RequestMethod.PUT })
     @ResponseBody

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
index 496e44a..f85638a 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/ProjectController.java
@@ -33,6 +33,7 @@ import org.apache.kylin.rest.request.UpdateProjectRequest;
 import org.apache.kylin.rest.service.AccessService;
 import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -199,7 +200,9 @@ public class ProjectController extends BasicController {
         if (StringUtils.isEmpty(projectRequest.getName())) {
             throw new InternalErrorException("A project name must be given to create a project");
         }
-
+        
+        String hiveName = projectRequest.getHive();
+        checkHiveIsValid(hiveName, null);
         ProjectInstance createdProj = null;
         try {
             createdProj = projectService.createProject(projectRequest);
@@ -217,7 +220,7 @@ public class ProjectController extends BasicController {
         if (StringUtils.isEmpty(projectRequest.getFormerProjectName())) {
             throw new InternalErrorException("A project name must be given to update a project");
         }
-
+        checkHiveIsValid(projectRequest.getNewHiveName(), projectRequest.getFormerProjectName());
         ProjectInstance updatedProj = null;
         try {
             ProjectInstance currentProject = projectService.getProjectManager().getProject(projectRequest.getFormerProjectName());
@@ -254,4 +257,32 @@ public class ProjectController extends BasicController {
     public void setCubeService(CubeService cubeService) {
         this.cubeService = cubeService;
     }
+    
+    /**
+     * Check input hive name is valid
+     * @param hiveName
+     */
+    private void checkHiveIsValid(String hiveName, String projectName) {
+        try {
+            if(HiveManager.getInstance().isSupportExternalHives()) {
+                HiveManager.getInstance().getHiveCommand(hiveName);
+                HiveManager.getInstance().getHiveConfigFile(hiveName);
+            } else if(hiveName != null) {
+                throw new InternalErrorException(HiveManager.NOT_SUPPORT_ERROR_MESSAGE);
+            }
+        } catch (IllegalArgumentException e) {
+            logger.error("Can not find hive " + hiveName + ", support hives : " + 
+                    HiveManager.getInstance().getExternalHiveName(), e);
+            throw new InternalErrorException("Can not find hive " + hiveName + " by current external hives configuration");
+        }
+        
+        if(projectName == null)
+            return ;
+        ProjectInstance project = projectService.getProjectManager().getProject(projectName);
+        if(project != null && !project.getRealizationEntries().isEmpty() && project.getHive() != hiveName) {
+            logger.warn("Update project {} from hive {} to {} may cause error, you can create a new project instead.", 
+                    projectName, project.getHive(), hiveName);
+            throw new InternalErrorException("It is not allow to modify hive name when cube exists in project.");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index f3374c3..addf544 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -29,6 +29,7 @@ import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.streaming.StreamingConfig;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
@@ -103,6 +104,8 @@ public class StreamingController extends BasicController {
     public StreamingRequest saveStreamingConfig(@RequestBody StreamingRequest streamingRequest) {
 
         String project = streamingRequest.getProject();
+        ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        String hive = projectMgr.getProject(project).getHive();
         TableDesc tableDesc = deserializeTableDesc(streamingRequest);
         StreamingConfig streamingConfig = deserializeSchemalDesc(streamingRequest);
         KafkaConfig kafkaConfig = deserializeKafkaSchemalDesc(streamingRequest);
@@ -110,6 +113,7 @@ public class StreamingController extends BasicController {
 
         try {
             tableDesc.setUuid(UUID.randomUUID().toString());
+            tableDesc.setHive(hive);
             MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
             metaMgr.saveSourceTable(tableDesc);
             cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project);

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index eefeba8..81ccb4b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.CardinalityRequest;
 import org.apache.kylin.rest.request.HiveTableRequest;
@@ -48,6 +49,7 @@ import org.apache.kylin.rest.service.ModelService;
 import org.apache.kylin.rest.service.ProjectService;
 import org.apache.kylin.rest.service.StreamingService;
 import org.apache.kylin.source.hive.HiveClient;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,7 +147,7 @@ public class TableController extends BasicController {
     @ResponseBody
     public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
-        String[] loaded = cubeMgmtService.reloadHiveTable(tables);
+        String[] loaded = cubeMgmtService.reloadHiveTable(tables, project);
         if (request.isCalculate()) {
             cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter);
         }
@@ -233,8 +235,11 @@ public class TableController extends BasicController {
     public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
         Map<String, String> result = new HashMap<String, String>();
         String project = request.getProject();
+        String hiveName = getHiveNameByProject(project);
+        
         TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class);
         desc.setUuid(UUID.randomUUID().toString());
+        desc.setHive(hiveName);
         MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
         metaMgr.saveSourceTable(desc);
         cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project);
@@ -311,11 +316,13 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveDatabases() throws IOException {
-        HiveClient hiveClient = new HiveClient();
+    private static List<String> showHiveDatabases(@RequestParam(value = "project", required = true) String project) 
+            throws IOException {
         List<String> results = null;
 
         try {
+            String hiveName = getHiveNameByProject(project);
+            HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
             results = hiveClient.getHiveDbNames();
         } catch (Exception e) {
             e.printStackTrace();
@@ -332,11 +339,13 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveTables(@PathVariable String database) throws IOException {
-        HiveClient hiveClient = new HiveClient();
+    private static List<String> showHiveTables(@RequestParam(value = "project", required = true) String project, 
+            @PathVariable String database) throws IOException {
         List<String> results = null;
 
         try {
+            String hiveName = getHiveNameByProject(project);
+            HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
             results = hiveClient.getHiveTableNames(database);
         } catch (Exception e) {
             e.printStackTrace();
@@ -344,6 +353,11 @@ public class TableController extends BasicController {
         }
         return results;
     }
+    
+    private static String getHiveNameByProject(String project) {
+        ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        return projectMgr.getProject(project).getHive();
+    }
 
     public void setCubeService(CubeService cubeService) {
         this.cubeMgmtService = cubeService;

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
index 71cd1c4..51f100e 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/CreateProjectRequest.java
@@ -22,6 +22,7 @@ package org.apache.kylin.rest.request;
  */
 public class CreateProjectRequest {
     private String name;
+    private String hive;
     private String description;
 
     public CreateProjectRequest() {
@@ -43,4 +44,15 @@ public class CreateProjectRequest {
         this.description = description;
     }
 
+    public String getHive() {
+        if(this.hive == null || this.hive.trim().isEmpty()) {
+            return null;
+        }
+        return this.hive.trim();
+    }
+
+    public void setHive(String hive) {
+        this.hive = hive;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
index 29ba162..6bc077b 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/UpdateProjectRequest.java
@@ -23,6 +23,7 @@ package org.apache.kylin.rest.request;
 public class UpdateProjectRequest {
     private String formerProjectName;
     private String newProjectName;
+    private String newHiveName;
     private String newDescription;
 
     public UpdateProjectRequest() {
@@ -52,4 +53,16 @@ public class UpdateProjectRequest {
     public void setNewProjectName(String newProjectName) {
         this.newProjectName = newProjectName;
     }
+
+    public String getNewHiveName() {
+        if(this.newHiveName == null || this.newHiveName.trim().isEmpty()) {
+            return null;
+        }
+        return this.newHiveName.trim();
+    }
+
+    public void setNewHiveName(String newHiveName) {
+        this.newHiveName = newHiveName;
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
index c3b1e7c..f041bf5 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableDescResponse.java
@@ -76,6 +76,7 @@ public class TableDescResponse extends TableDesc {
         this.setName(table.getName());
         this.setSourceType(table.getSourceType());
         this.setUuid(table.getUuid());
+        this.setHive(table.getHive());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
index abf0638..e96bddf 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
@@ -143,7 +144,18 @@ public abstract class BasicService {
         })));
         return results;
     }
-
+    
+    public ProjectInstance getProjectByCube(String cubeName) {
+        CubeInstance cube = getCubeManager().getCube(cubeName);
+        List<ProjectInstance> projList = this.getProjectManager().findProjects(cube.getType(), cube.getName());
+        if (projList == null || projList.size() == 0) {
+            throw new RuntimeException("Cannot find the project containing the cube " + cube.getName());
+        } else if (projList.size() >= 2) {
+            throw new RuntimeException("Find more than one project containing the cube " + cube.getName() + ". It does't meet the uniqueness requirement");
+        }
+        return projList.get(0);
+    }
+    
     protected List<CubingJob> listAllCubingJobs(final String cubeName, final String projectName, final Set<ExecutableState> statusList) {
         return listAllCubingJobs(cubeName, projectName, statusList, getExecutableManager().getAllOutputs());
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
index 2160e3d..6e091d8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CacheService.java
@@ -48,6 +48,7 @@ import org.apache.kylin.metadata.realization.RealizationType;
 import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
 import org.apache.kylin.rest.controller.QueryController;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hybrid.HybridManager;
@@ -210,7 +211,8 @@ public class CacheService extends BasicService {
                 KafkaConfigManager.clearCache();
                 StreamingManager.clearCache();
                 HBaseConnection.clearConnCache();
-
+                HiveManager.clearCache();
+                
                 cleanAllDataCache();
                 removeAllOLAPDataSources();
                 break;

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 4cd527c..dcad40f 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -475,6 +475,10 @@ public class CubeService extends BasicService {
             logger.error("Cannot find table descirptor " + tableName, e);
             throw e;
         }
+        if(table.getHive() != null) {
+            logger.warn("Can not calculate cardinality for table {} which is loaded from external hive source !", tableName);
+            return;
+        }
 
         DefaultChainedExecutable job = new DefaultChainedExecutable();
         job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
@@ -482,7 +486,7 @@ public class CubeService extends BasicService {
 
         String outPath = HiveColumnCardinalityJob.OUTPUT_PATH + "/" + tableName;
         String param = "-table " + tableName + " -output " + outPath;
-
+        
         MapReduceExecutable step1 = new MapReduceExecutable();
 
         step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
@@ -561,8 +565,8 @@ public class CubeService extends BasicService {
     }
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public String[] reloadHiveTable(String tables) throws IOException {
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig());
+    public String[] reloadHiveTable(String tables, String project) throws IOException {
+        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), project, getConfig());
         return (String[]) loaded.toArray(new String[loaded.size()]);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
index b4cceb2..00e4da6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/ProjectService.java
@@ -53,13 +53,14 @@ public class ProjectService extends BasicService {
     public ProjectInstance createProject(CreateProjectRequest projectRequest) throws IOException {
         String projectName = projectRequest.getName();
         String description = projectRequest.getDescription();
+        String hiveName = projectRequest.getHive();
         ProjectInstance currentProject = getProjectManager().getProject(projectName);
 
         if (currentProject != null) {
             throw new InternalErrorException("The project named " + projectName + " already exists");
         }
         String owner = SecurityContextHolder.getContext().getAuthentication().getName();
-        ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description);
+        ProjectInstance createdProject = getProjectManager().createProject(projectName, owner, description, hiveName);
         accessService.init(createdProject, AclPermission.ADMINISTRATION);
         logger.debug("New project created.");
 
@@ -71,12 +72,13 @@ public class ProjectService extends BasicService {
         String formerProjectName = projectRequest.getFormerProjectName();
         String newProjectName = projectRequest.getNewProjectName();
         String newDescription = projectRequest.getNewDescription();
+        String newHiveName = projectRequest.getNewHiveName();
 
         if (currentProject == null) {
             throw new InternalErrorException("The project named " + formerProjectName + " does not exists");
         }
 
-        ProjectInstance updatedProject = getProjectManager().updateProject(currentProject, newProjectName, newDescription);
+        ProjectInstance updatedProject = getProjectManager().updateProject(currentProject, newProjectName, newDescription, newHiveName);
 
         logger.debug("Project updated.");
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
index bcb9a38..39cfcad 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
@@ -74,7 +74,8 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     }
 
     private void createFlatHiveTable(KylinConfig config, int numReducers) throws IOException {
-        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+        final String hiveName = getHiveName();
+        final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
         hiveCmdBuilder.addStatement(getInitStatement());
         boolean useRedistribute = getUseRedistribute();
         if (useRedistribute == true) {
@@ -158,4 +159,12 @@ public class CreateFlatHiveTableStep extends AbstractExecutable {
     public String getRowCountOutputDir() {
         return getParam("rowCountOutputDir");
     }
+    
+    public void setHiveName(String hiveName) {
+        setParam("hiveName", hiveName);
+    }
+    
+    public String getHiveName() {
+        return getParam("hiveName");
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
index a99b304..891c635 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveClient.java
@@ -104,7 +104,7 @@ public class HiveClient {
             executeHQL(sql);
     }
 
-    private HiveMetaStoreClient getMetaStoreClient() throws Exception {
+    protected HiveMetaStoreClient getMetaStoreClient() throws Exception {
         if (metaStoreClient == null) {
             metaStoreClient = new HiveMetaStoreClient(hiveConf);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
index 5a5b4e0..bde6e3b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveCmdBuilder.java
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,10 +43,16 @@ public class HiveCmdBuilder {
     private HiveClientMode clientMode;
     private KylinConfig kylinConfig;
     final private ArrayList<String> statements = Lists.newArrayList();
+    protected String hiveName;
 
     public HiveCmdBuilder() {
+        this(null);
+    }
+    
+    public HiveCmdBuilder(String hive) {
         kylinConfig = KylinConfig.getInstanceFromEnv();
         clientMode = HiveClientMode.valueOf(kylinConfig.getHiveClientMode().toUpperCase());
+        this.hiveName = hive;
     }
 
     public String build() {
@@ -53,13 +60,17 @@ public class HiveCmdBuilder {
 
         switch (clientMode) {
         case CLI:
-            buf.append("hive -e \"");
+            String hiveCommand = HiveManager.getInstance().getHiveCommand(this.hiveName);
+            buf.append(hiveCommand + " -e \"");
             for (String statement : statements) {
                 buf.append(statement).append("\n");
             }
             buf.append("\"");
             break;
         case BEELINE:
+            if(this.hiveName != null) {
+                throw new IllegalArgumentException("Can not use external hive with beeline mode to build cube currently.");
+            }
             BufferedWriter bw = null;
             try {
                 File tmpHql = File.createTempFile("beeline_", ".hql");

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 520d7cc..2ddd381 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -26,6 +26,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -52,6 +53,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.LookupDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.hive.external.ExternalHiveClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,13 +87,17 @@ public class HiveMRInput implements IMRInput {
 
         @Override
         public void configureJob(Job job) {
-            try {
-                HCatInputFormat.setInput(job, dbName, tableName);
-                job.setInputFormatClass(HCatInputFormat.class);
-
-                job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
-            } catch (IOException e) {
-                throw new RuntimeException(e);
+            // HiveConf config file is static variable and ensure we set correct one. 
+            synchronized(ExternalHiveClient.class) {
+                HiveConf.setHiveSiteLocation(Thread.currentThread().getContextClassLoader().getResource("hive-site.xml"));
+                try {
+                    HCatInputFormat.setInput(job, dbName, tableName);
+                    job.setInputFormatClass(HCatInputFormat.class);
+    
+                    job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
             }
         }
 
@@ -104,9 +110,9 @@ public class HiveMRInput implements IMRInput {
 
     public static class BatchCubingInputSide implements IMRBatchCubingInputSide {
 
-        final JobEngineConfig conf;
-        final IJoinedFlatTableDesc flatDesc;
-        String hiveViewIntermediateTables = "";
+        protected final JobEngineConfig conf;
+        protected final IJoinedFlatTableDesc flatDesc;
+        protected String hiveViewIntermediateTables = "";
 
         public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
             this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
@@ -126,7 +132,7 @@ public class HiveMRInput implements IMRInput {
                 jobFlow.addTask(createRedistributeFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName));
             } else if ("2".equals(createFlatTableMethod)) {
                 // count from source table first, and then redistribute, suitable for partitioned table
-                final String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId()) + "/row_count";
+                final String rowCountOutputDir = getRowCountOutputDir(conf, jobFlow.getId());
                 jobFlow.addTask(createCountHiveTableStep(conf, flatDesc, jobFlow.getId(), rowCountOutputDir));
                 jobFlow.addTask(createFlatHiveTableStep(conf, flatDesc, jobFlow.getId(), cubeName, true, rowCountOutputDir));
             } else {
@@ -139,16 +145,25 @@ public class HiveMRInput implements IMRInput {
             }
         }
 
-        public static AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
+        protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) {
+            return JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+        }
+
+        protected boolean isWriteToLocalDir() {
+            return false;
+        }
+
+        public AbstractExecutable createRedistributeFlatHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String cubeName) {
             StringBuilder hiveInitBuf = new StringBuilder();
             hiveInitBuf.append("USE ").append(conf.getConfig().getHiveDatabaseForIntermediateTable()).append(";\n");
             hiveInitBuf.append(JoinedFlatTable.generateHiveSetStatements(conf));
 
-            String rowCountOutputDir = JobBuilderSupport.getJobWorkingDir(conf, jobId) + "/row_count";
+            String rowCountOutputDir = getRowCountOutputDir(conf, jobId);
 
             RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
+            step.setHiveName(flatTableDesc.getHiveName());
             step.setInitStatement(hiveInitBuf.toString());
-            step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir));
+            step.setSelectRowCountStatement(JoinedFlatTable.generateSelectRowCountStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir()));
             step.setRowCountOutputDir(rowCountOutputDir);
             step.setRedistributeDataStatement(JoinedFlatTable.generateRedistributeFlatTableStatement(flatTableDesc));
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
@@ -157,13 +172,14 @@ public class HiveMRInput implements IMRInput {
         }
 
 
-        public static AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
+        public AbstractExecutable createCountHiveTableStep(JobEngineConfig conf, IJoinedFlatTableDesc flatTableDesc, String jobId, String rowCountOutputDir) {
             final ShellExecutable step = new ShellExecutable();
-
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final String hiveName = flatTableDesc.getHiveName();
+            
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
             hiveCmdBuilder.addStatement(JoinedFlatTable.generateHiveSetStatements(conf));
             hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
-            hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir));
+            hiveCmdBuilder.addStatement(JoinedFlatTable.generateCountDataStatement(flatTableDesc, rowCountOutputDir, isWriteToLocalDir()));
 
             step.setCmd(hiveCmdBuilder.build());
             step.setName(ExecutableConstants.STEP_NAME_COUNT_HIVE_TABLE);
@@ -174,7 +190,7 @@ public class HiveMRInput implements IMRInput {
         public ShellExecutable createLookupHiveViewMaterializationStep(String jobId) {
             ShellExecutable step = new ShellExecutable();
             step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
-            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(flatDesc.getHiveName());
 
             KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             MetadataManager metadataManager = MetadataManager.getInstance(kylinConfig);
@@ -221,6 +237,7 @@ public class HiveMRInput implements IMRInput {
             String insertDataHqls = JoinedFlatTable.generateInsertDataStatement(flatTableDesc, conf, redistribute);
 
             CreateFlatHiveTableStep step = new CreateFlatHiveTableStep();
+            step.setHiveName(flatTableDesc.getHiveName());
             step.setUseRedistribute(redistribute);
             step.setInitStatement(hiveInitBuf.toString());
             step.setRowCountOutputDir(rowCountOutputDir);
@@ -245,7 +262,7 @@ public class HiveMRInput implements IMRInput {
             return new HiveTableInputFormat(getIntermediateTableIdentity());
         }
 
-        private String getIntermediateTableIdentity() {
+        protected String getIntermediateTableIdentity() {
             return conf.getConfig().getHiveDatabaseForIntermediateTable() + "." + flatDesc.getTableName();
         }
     }
@@ -254,7 +271,8 @@ public class HiveMRInput implements IMRInput {
         private final BufferedLogger stepLogger = new BufferedLogger(logger);
 
         private void computeRowCount(CliCommandExecutor cmdExecutor) throws IOException {
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final String hiveName = getHiveName();
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
             hiveCmdBuilder.addStatement(getInitStatement());
             hiveCmdBuilder.addStatement("set hive.exec.compress.output=false;\n");
             hiveCmdBuilder.addStatement(getSelectRowCountStatement());
@@ -282,7 +300,8 @@ public class HiveMRInput implements IMRInput {
         }
 
         private void redistributeTable(KylinConfig config, int numReducers) throws IOException {
-            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder();
+            final String hiveName = getHiveName();
+            final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(hiveName);
             hiveCmdBuilder.addStatement(getInitStatement());
             hiveCmdBuilder.addStatement("set mapreduce.job.reduces=" + numReducers + ";\n");
             hiveCmdBuilder.addStatement("set hive.merge.mapredfiles=false;\n");
@@ -369,6 +388,14 @@ public class HiveMRInput implements IMRInput {
         public String getRowCountOutputDir() {
             return getParam("rowCountOutputDir");
         }
+        
+        public void setHiveName(String hiveName) {
+            setParam("hiveName", hiveName);
+        }
+        
+        public String getHiveName() {
+            return getParam("hiveName");
+        }
     }
 
     public static class GarbageCollectionStep extends AbstractExecutable {
@@ -390,7 +417,7 @@ public class HiveMRInput implements IMRInput {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
         }
 
-        private String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
+        protected String cleanUpIntermediateFlatTable(KylinConfig config) throws IOException {
             StringBuffer output = new StringBuffer();
             final String hiveTable = this.getIntermediateTableIdentity();
             if (config.isHiveKeepFlatTable() == false && StringUtils.isNotEmpty(hiveTable)) {
@@ -441,7 +468,7 @@ public class HiveMRInput implements IMRInput {
             setParam("oldHiveTable", tableIdentity);
         }
 
-        private String getIntermediateTableIdentity() {
+        protected String getIntermediateTableIdentity() {
             return getParam("oldHiveTable");
         }
 
@@ -449,7 +476,7 @@ public class HiveMRInput implements IMRInput {
             setParam("externalDataPath", externalDataPath);
         }
 
-        private String getExternalDataPath() {
+        protected String getExternalDataPath() {
             return getParam("externalDataPath");
         }
 
@@ -457,9 +484,17 @@ public class HiveMRInput implements IMRInput {
             setParam("oldHiveViewIntermediateTables", tableIdentities);
         }
 
-        private String getHiveViewIntermediateTableIdentities() {
+        protected String getHiveViewIntermediateTableIdentities() {
             return getParam("oldHiveViewIntermediateTables");
         }
+        
+        public String getCmd() {
+            StringBuffer buf = new StringBuffer();
+            buf.append(" -").append("externalDataPath").append(" ").append(getIntermediateTableIdentity()).append(" -").append("externalDataPath").append(" ").
+                append(getExternalDataPath()).append(" -").append("oldHiveViewIntermediateTables").append(" ").append(getHiveViewIntermediateTableIdentities());
+            
+            return buf.toString();
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 70b097c..aceb791 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -33,6 +33,8 @@ import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,7 +57,7 @@ public class HiveSourceTableLoader {
     public static final String TABLE_FOLDER_NAME = "table";
     public static final String TABLE_EXD_FOLDER_NAME = "table_exd";
 
-    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+    public static Set<String> reloadHiveTables(String[] hiveTables, String project, KylinConfig config) throws IOException {
 
         Map<String, Set<String>> db2tables = Maps.newHashMap();
         for (String table : hiveTables) {
@@ -71,7 +73,7 @@ public class HiveSourceTableLoader {
         // extract from hive
         Set<String> loadedTables = Sets.newHashSet();
         for (String database : db2tables.keySet()) {
-            List<String> loaded = extractHiveTables(database, db2tables.get(database), config);
+            List<String> loaded = extractHiveTables(database, db2tables.get(database), project, config);
             loadedTables.addAll(loaded);
         }
 
@@ -84,13 +86,23 @@ public class HiveSourceTableLoader {
         metaMgr.removeTableExd(hiveTable);
     }
 
-    private static List<String> extractHiveTables(String database, Set<String> tables, KylinConfig config) throws IOException {
+    private static List<String> extractHiveTables(String database, Set<String> tables, String project, KylinConfig config) throws IOException {
 
         List<String> loadedTables = Lists.newArrayList();
         MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
+        ProjectManager projectMgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        String hiveName = projectMgr.getProject(project).getHive();
         for (String tableName : tables) {
             Table table = null;
-            HiveClient hiveClient = new HiveClient();
+            HiveClient hiveClient = HiveManager.getInstance().createHiveClient(hiveName);
+            String tableIdentity = database + "." + tableName;
+            TableDesc tableDesc = metaMgr.getTableDesc(tableIdentity);
+            if(tableDesc != null && !HiveManager.isSameHiveSource(tableDesc.getHive(), hiveName)) {
+                throw new IllegalArgumentException("Table " + tableIdentity + " that in hive " + 
+                        hiveName + " has been loaded in hive " + tableDesc.getHive() + 
+                        ", please rename the table or unload the old one.");
+            }
+
             List<FieldSchema> partitionFields = null;
             List<FieldSchema> fields = null;
             try {
@@ -108,13 +120,13 @@ public class HiveSourceTableLoader {
 
             long tableSize = hiveClient.getFileSizeForTable(table);
             long tableFileNum = hiveClient.getFileNumberForTable(table);
-            TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
             if (tableDesc == null) {
                 tableDesc = new TableDesc();
                 tableDesc.setDatabase(database.toUpperCase());
                 tableDesc.setName(tableName.toUpperCase());
                 tableDesc.setUuid(UUID.randomUUID().toString());
                 tableDesc.setLastModified(0);
+                tableDesc.setHive(hiveName);
             }
             if (table.getTableType() != null) {
                 tableDesc.setTableType(table.getTableType());

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
index dcc43ff..9959ca9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTable.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.mr.DFSFileTable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.ReadableTable;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,17 +37,19 @@ public class HiveTable implements ReadableTable {
 
     final private String database;
     final private String hiveTable;
+    final private String hiveName;
 
     private HiveClient hiveClient;
 
     public HiveTable(TableDesc tableDesc) {
+        this.hiveName = tableDesc.getHive();
         this.database = tableDesc.getDatabase();
         this.hiveTable = tableDesc.getName();
     }
 
     @Override
     public TableReader getReader() throws IOException {
-        return new HiveTableReader(database, hiveTable);
+        return new HiveTableReader(database, hiveTable, getHiveClient().getHiveConf());
     }
 
     @Override
@@ -86,7 +89,7 @@ public class HiveTable implements ReadableTable {
     public HiveClient getHiveClient() {
 
         if (hiveClient == null) {
-            hiveClient = new HiveClient();
+            hiveClient = HiveManager.getInstance().createHiveClient(this.hiveName);
         }
         return hiveClient;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
index 3f9ce01..f6db33a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveTableReader.java
@@ -48,6 +48,7 @@ public class HiveTableReader implements TableReader {
     private HCatRecord currentHCatRecord;
     private int numberOfSplits = 0;
     private Map<String, String> partitionKV = null;
+    private HiveConf hiveConf = null;
 
     /**
      * Constructor for reading whole hive table
@@ -55,8 +56,8 @@ public class HiveTableReader implements TableReader {
      * @param tableName
      * @throws IOException
      */
-    public HiveTableReader(String dbName, String tableName) throws IOException {
-        this(dbName, tableName, null);
+    public HiveTableReader(String dbName, String tableName, HiveConf hiveConf) throws IOException {
+        this(dbName, tableName, null, hiveConf);
     }
 
     /**
@@ -66,16 +67,17 @@ public class HiveTableReader implements TableReader {
      * @param partitionKV key-value pairs condition on the partition
      * @throws IOException
      */
-    public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV) throws IOException {
+    public HiveTableReader(String dbName, String tableName, Map<String, String> partitionKV, HiveConf hiveConf) throws IOException {
         this.dbName = dbName;
         this.tableName = tableName;
         this.partitionKV = partitionKV;
+        this.hiveConf = hiveConf;
         initialize();
     }
 
     private void initialize() throws IOException {
         try {
-            this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV);
+            this.readCntxt = getHiveReaderContext(dbName, tableName, partitionKV, this.hiveConf);
         } catch (Exception e) {
             e.printStackTrace();
             throw new IOException(e);
@@ -146,8 +148,10 @@ public class HiveTableReader implements TableReader {
         return "hive table reader for: " + dbName + "." + tableName;
     }
 
-    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV) throws Exception {
-        HiveConf hiveConf = new HiveConf(HiveTableReader.class);
+    private static ReaderContext getHiveReaderContext(String database, String table, Map<String, String> partitionKV, HiveConf hiveConf) 
+            throws Exception {
+        if(hiveConf == null)
+            hiveConf = new HiveConf(HiveTableReader.class);
         Iterator<Entry<String, String>> itr = hiveConf.iterator();
         Map<String, String> map = new HashMap<String, String>();
         while (itr.hasNext()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
index 79493a4..08fbcd6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HqlExecutable.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.source.hive.external.HiveManager;
 import org.datanucleus.store.types.backed.HashMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,6 +43,7 @@ public class HqlExecutable extends AbstractExecutable {
     private static final Logger logger = LoggerFactory.getLogger(HqlExecutable.class);
 
     private static final String HQL = "hql";
+    private static final String HIVE_NAME = "hive-name";
     private static final String HIVE_CONFIG = "hive-config";
 
     public HqlExecutable() {
@@ -52,7 +54,8 @@ public class HqlExecutable extends AbstractExecutable {
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         try {
             Map<String, String> configMap = getConfiguration();
-            HiveClient hiveClient = new HiveClient(configMap);
+            String hiveName = getHiveName();
+            HiveClient hiveClient = HiveManager.getInstance().createHiveClientWithConfig(configMap, hiveName);
 
             for (String hql : getHqls()) {
                 hiveClient.executeHQL(hql);
@@ -104,4 +107,11 @@ public class HqlExecutable extends AbstractExecutable {
         }
     }
 
+    public void setHiveName(String hiveName) {
+        setParam(HIVE_NAME, hiveName);
+    }
+    
+    private String getHiveName() {
+        return getParam(HIVE_NAME);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
new file mode 100644
index 0000000..3a488d3
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/DistcpShellExecutable.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DistcpShellExecutable extends ShellExecutable  {
+    private static final Logger logger = LoggerFactory.getLogger(DistcpShellExecutable.class);
+
+    private static final String HIVE_NAME = "hiveName";
+    private static final String TABLE_NAME = "tableName";
+    private static final String OUTPUT_PATH = "output";
+    
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        String tableName = getTableName();
+        String hiveName = getHiveName();
+        String input = null;
+        String database = context.getConfig().getHiveDatabaseForIntermediateTable();
+        try {
+            if(hiveName == null) {
+                return new ExecuteResult(ExecuteResult.State.SUCCEED, "The job is using default hive, do not need copy data!");
+            }
+            input = HiveManager.getInstance().getHiveTableLocation(database, tableName, hiveName);
+        } catch (Exception e) {
+            logger.error("Failed to get location of hive table {}.{}, using hive name {}", database, tableName, hiveName);
+            return new ExecuteResult(ExecuteResult.State.ERROR , e.getLocalizedMessage());
+        }
+        String output = getOutputPath();
+        logger.info("Copy Intermediate Hive Table input : {} , output : {}", input, output);
+        /**
+         * Copy hive table only when source hive table location is in different hadoop cluster.
+         */
+        if(input.startsWith("/") || input.startsWith(HadoopUtil.getCurrentConfiguration().get(FileSystem.FS_DEFAULT_NAME_KEY))) {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "Hive " + hiveName + " is based on default hadoop cluster, skip copy data .");
+        } else {
+            Path inputPath = new Path(input);
+            input = inputPath.toString();
+        }
+        String cmd = String.format("hadoop distcp -overwrite %s %s", input, output);
+        super.setCmd(cmd);
+        
+        return super.doWork(context);
+    }
+    
+    public void setHiveName(String name) {
+        setParam(HIVE_NAME, name);
+    }
+    
+    public void setTableName(String name) {
+        setParam(TABLE_NAME, name);
+    }
+    
+    public void setOutputPath(String output) {
+        setParam(OUTPUT_PATH, output);
+    }
+    
+    public String getOutputPath() {
+        return getParam(OUTPUT_PATH);
+    }
+    
+    public String getHiveName() {
+        return getParam(HIVE_NAME);
+    }
+    
+    public String getTableName() {
+        return getParam(TABLE_NAME);
+    }
+    
+    public String getExecCmd() {
+        StringBuffer buf = new StringBuffer();
+        buf.append(" -").append(HIVE_NAME).append(" ").append(getHiveName()).append(" -").append(TABLE_NAME).append(" ").
+            append(getTableName()).append(" -").append(OUTPUT_PATH).append(" ").append(getOutputPath());
+        
+        return buf.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
new file mode 100644
index 0000000..b7d1559
--- /dev/null
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.hive.external;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.kylin.source.hive.HiveClient;
+
+/**
+ * Hive meta API client for external hive
+ * @author hzfengyu
+ */
+public class ExternalHiveClient extends HiveClient {
+    private final static String LOCAL_FS_SCHEMA = "file://";
+    
+    public ExternalHiveClient(String location) {
+        URL uri = null;
+        if(location != null) {
+            try {
+                uri = new URL(LOCAL_FS_SCHEMA + location);
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException("Can not find hive config file " + location);
+            }
+        } else {
+            uri = Thread.currentThread().getContextClassLoader().getResource("hive-site.xml");
+        }
+        
+        /**
+         * In HiveConf, hiveSiteURL is a static variable, so we should use a global lock.
+         * If uri is null, HiveConf will use the file from java classpath.
+         */
+        synchronized(ExternalHiveClient.class) {
+            hiveConf.setHiveSiteLocation(uri);
+            hiveConf = new HiveConf(HiveClient.class);
+        }
+    }
+    
+    public ExternalHiveClient(Map<String, String> configMap, String location) {
+        this(location);
+        appendConfiguration(configMap);
+    }
+    
+    @Override
+    protected HiveMetaStoreClient getMetaStoreClient() throws Exception {
+        /**
+         * HMSHandler is a LocalThread variable, in tomcat we should check it.
+         * When to remove hive meta store client in thread local variable:
+         * 1: when create new hive client. but HMSHandler exist in current thread.
+         * 2: when change hive client in current thread
+         */
+        if (metaStoreClient == null) {
+            HiveMetaStore.HMSHandler.removeRawStore();
+            metaStoreClient = new HiveMetaStoreClient(hiveConf);
+        } else if(HiveMetaStore.HMSHandler.getRawStore() != null && HiveMetaStore.HMSHandler.getRawStore().getConf() != this.hiveConf) {
+            HiveMetaStore.HMSHandler.getRawStore().shutdown();
+            HiveMetaStore.HMSHandler.getRawStore().setConf(this.hiveConf);
+        }
+        return metaStoreClient;
+    }
+}