You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/26 15:42:29 UTC

kylin git commit: refactor to TableService

Repository: kylin
Updated Branches:
  refs/heads/hive-load-refactor [created] 0b66ef1d6


refactor to TableService


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0b66ef1d
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0b66ef1d
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0b66ef1d

Branch: refs/heads/hive-load-refactor
Commit: 0b66ef1d60e8efc1c29d7d8c247d3cdd7506cb58
Parents: a78b650
Author: Billy Liu <bi...@apache.org>
Authored: Mon Dec 26 23:42:11 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Mon Dec 26 23:42:11 2016 +0800

----------------------------------------------------------------------
 .../hive/ITHiveSourceTableLoaderTest.java       |   2 +-
 .../rest/controller/StreamingController.java    |  22 +-
 .../kylin/rest/controller/TableController.java  | 199 ++----------
 .../apache/kylin/rest/service/CubeService.java  |  96 ------
 .../apache/kylin/rest/service/TableService.java | 307 +++++++++++++++++++
 .../source/hive/HiveSourceTableLoader.java      |   2 +-
 6 files changed, 333 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index c4f0777..7aff3ba 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
     public void test() throws IOException {
         KylinConfig config = getTestConfig();
         String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, config);
 
         assertTrue(loaded.size() == toLoad.length);
         for (String str : toLoad)

http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e04ebc8..0ced9ad 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -23,10 +23,8 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -34,9 +32,9 @@ import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.KafkaConfigService;
 import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.rest.service.TableService;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +67,7 @@ public class StreamingController extends BasicController {
     @Autowired
     private KafkaConfigService kafkaConfigService;
     @Autowired
-    private CubeService cubeMgmtService;
+    private TableService tableService;
 
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
     @ResponseBody
@@ -113,10 +111,7 @@ public class StreamingController extends BasicController {
         boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
 
         try {
-            tableDesc.setUuid(UUID.randomUUID().toString());
-            MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-            metaMgr.saveSourceTable(tableDesc);
-            cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project);
+            tableService.addStreamingTable(tableDesc, project);
         } catch (IOException e) {
             throw new BadRequestException("Failed to add streaming table.");
         }
@@ -289,15 +284,4 @@ public class StreamingController extends BasicController {
         request.setMessage(message);
     }
 
-    public void setStreamingService(StreamingService streamingService) {
-        this.streamingService = streamingService;
-    }
-
-    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
-        this.kafkaConfigService = kafkaConfigService;
-    }
-
-    public void setCubeService(CubeService cubeService) {
-        this.cubeMgmtService = cubeService;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 74d1b28..0debad7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -19,37 +19,18 @@
 package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.CardinalityRequest;
 import org.apache.kylin.rest.request.HiveTableRequest;
 import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.response.TableDescResponse;
-import org.apache.kylin.rest.service.CubeService;
-import org.apache.kylin.rest.service.KafkaConfigService;
-import org.apache.kylin.rest.service.ModelService;
-import org.apache.kylin.rest.service.ProjectService;
-import org.apache.kylin.rest.service.StreamingService;
-import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.source.hive.IHiveClient;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.rest.service.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -70,43 +51,28 @@ import com.google.common.collect.Sets;
 @Controller
 @RequestMapping(value = "/tables")
 public class TableController extends BasicController {
+
     private static final Logger logger = LoggerFactory.getLogger(TableController.class);
 
     @Autowired
-    private CubeService cubeMgmtService;
-    @Autowired
-    private ProjectService projectService;
-    @Autowired
-    private StreamingService streamingService;
-    @Autowired
-    private KafkaConfigService kafkaConfigService;
-    @Autowired
-    private ModelService modelService;
+    private TableService tableService;
 
     /**
-     * Get available table list of the input database
+     * Get available table list of the project
      *
      * @return Table metadata array
      * @throws IOException
      */
     @RequestMapping(value = "", method = { RequestMethod.GET })
     @ResponseBody
-    public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
-        long start = System.currentTimeMillis();
+    public List<TableDesc> getTableDesc(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
         List<TableDesc> tables = null;
         try {
-            tables = cubeMgmtService.getProjectManager().listDefinedTables(project);
-        } catch (Exception e) {
-            logger.error("Failed to deal with the request.", e);
+            tables = tableService.getTableDescByProject(project, withExt);
+        } catch (IOException e) {
+            logger.error("Failed to get Hive Tables", e);
             throw new InternalErrorException(e.getLocalizedMessage());
         }
-
-        if (withExt) {
-            tables = cloneTableDesc(tables);
-        }
-        long end = System.currentTimeMillis();
-        logger.info("Return all table metadata in " + (end - start) + " seconds");
-
         return tables;
     }
 
@@ -118,26 +84,19 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/{tableName:.+}", method = { RequestMethod.GET })
     @ResponseBody
-    public TableDesc getHiveTable(@PathVariable String tableName) {
-        return cubeMgmtService.getMetadataManager().getTableDesc(tableName);
-    }
-
-    @RequestMapping(value = "/reload", method = { RequestMethod.PUT })
-    @ResponseBody
-    public String reloadSourceTable() {
-        cubeMgmtService.getMetadataManager().reload();
-        return "ok";
+    public TableDesc getTableDesc(@PathVariable String tableName) {
+        return tableService.getTableDescByName(tableName);
     }
 
     @RequestMapping(value = "/{tables}/{project}", method = { RequestMethod.POST })
     @ResponseBody
-    public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
+    public Map<String, String[]> loadHiveTables(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
-        String[] loaded = cubeMgmtService.reloadHiveTable(tables);
+        String[] tableNames = tables.split(",");
+        String[] loaded = tableService.loadHiveTablesToProject(tableNames, project);
         if (request.isCalculate()) {
-            cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter);
+            tableService.calculateCardinalityIfNotPresent(loaded, submitter);
         }
-        cubeMgmtService.syncTableToProject(loaded, project);
         Map<String, String[]> result = new HashMap<String, String[]>();
         result.put("result.loaded", loaded);
         result.put("result.unloaded", new String[] {});
@@ -151,7 +110,7 @@ public class TableController extends BasicController {
         Set<String> unLoadFail = Sets.newHashSet();
         Map<String, String[]> result = new HashMap<String, String[]>();
         for (String tableName : tables.split(",")) {
-            if (unLoadHiveTable(tableName, project)) {
+            if (tableService.unLoadHiveTable(tableName, project)) {
                 unLoadSuccess.add(tableName);
             } else {
                 unLoadFail.add(tableName);
@@ -162,73 +121,13 @@ public class TableController extends BasicController {
         return result;
     }
 
-    /**
-     * table may referenced by several projects, and kylin only keep one copy of meta for each table,
-     * that's why we have two if statement here.
-     * @param tableName
-     * @param project
-     * @return
-     */
-    private boolean unLoadHiveTable(String tableName, String project) {
-        boolean rtn = false;
-        int tableType = 0;
-
-        //remove streaming info
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        TableDesc desc = cubeMgmtService.getMetadataManager().getTableDesc(tableName);
-        if (desc == null)
-            return false;
-        tableType = desc.getSourceType();
-
-        try {
-            if (!modelService.isTableInModel(tableName, project)) {
-                cubeMgmtService.removeTableFromProject(tableName, project);
-                rtn = true;
-            } else {
-                List<String> models = modelService.getModelsUsingTable(tableName, project);
-                throw new InternalErrorException("Table is already in use by models " + models);
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-        if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
-            try {
-                cubeMgmtService.unLoadHiveTable(tableName);
-                rtn = true;
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-                rtn = false;
-            }
-        }
-
-        if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
-            StreamingConfig config = null;
-            KafkaConfig kafkaConfig = null;
-            try {
-                config = streamingService.getStreamingManager().getStreamingConfig(tableName);
-                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
-                streamingService.dropStreamingConfig(config);
-                kafkaConfigService.dropKafkaConfig(kafkaConfig);
-                rtn = true;
-            } catch (Exception e) {
-                rtn = false;
-                logger.error(e.getLocalizedMessage(), e);
-            }
-        }
-        return rtn;
-    }
-
     @RequestMapping(value = "/addStreamingSrc", method = { RequestMethod.POST })
     @ResponseBody
     public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
         Map<String, String> result = new HashMap<String, String>();
         String project = request.getProject();
         TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class);
-        desc.setUuid(UUID.randomUUID().toString());
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        metaMgr.saveSourceTable(desc);
-        cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project);
+        tableService.addStreamingTable(desc, project);
         result.put("success", "true");
         return result;
     }
@@ -245,59 +144,12 @@ public class TableController extends BasicController {
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
         String[] tables = tableNames.split(",");
         for (String table : tables) {
-            cubeMgmtService.calculateCardinality(table.trim().toUpperCase(), submitter);
+            tableService.calculateCardinality(table.trim().toUpperCase(), submitter);
         }
         return request;
     }
 
     /**
-     * @param tables
-     * @return
-     */
-    private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
-        if (null == tables) {
-            return Collections.emptyList();
-        }
-
-        List<TableDesc> descs = new ArrayList<TableDesc>();
-        Iterator<TableDesc> it = tables.iterator();
-        while (it.hasNext()) {
-            TableDesc table = it.next();
-            TableExtDesc tableExtDesc = cubeMgmtService.getMetadataManager().getTableExt(table.getIdentity());
-
-            // Clone TableDesc
-            TableDescResponse rtableDesc = new TableDescResponse(table);
-            Map<String, Long> cardinality = new HashMap<String, Long>();
-            Map<String, String> dataSourceProp = new HashMap<>();
-            String scard = tableExtDesc.getCardinality();
-            if (!StringUtils.isEmpty(scard)) {
-                String[] cards = StringUtils.split(scard, ",");
-                ColumnDesc[] cdescs = rtableDesc.getColumns();
-                for (int i = 0; i < cdescs.length; i++) {
-                    ColumnDesc columnDesc = cdescs[i];
-                    if (cards.length > i) {
-                        cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
-                    } else {
-                        logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length);
-                        break;
-                    }
-                }
-                rtableDesc.setCardinality(cardinality);
-            }
-            dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
-            dataSourceProp.put("location", tableExtDesc.getStorageLocation());
-            dataSourceProp.put("owner", tableExtDesc.getOwner());
-            dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
-            dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
-            dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
-            rtableDesc.setDescExd(dataSourceProp);
-            descs.add(rtableDesc);
-        }
-
-        return descs;
-    }
-
-    /**
      * Show all databases in Hive
      *
      * @return Hive databases list
@@ -305,14 +157,11 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveDatabases() throws IOException {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+    private List<String> showHiveDatabases() throws IOException {
         List<String> results = null;
-
         try {
-            results = hiveClient.getHiveDbNames();
+            results = tableService.getHiveDbNames();
         } catch (Exception e) {
-            e.printStackTrace();
             throw new IOException(e);
         }
         return results;
@@ -326,21 +175,15 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveTables(@PathVariable String database) throws IOException {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+    private List<String> showHiveTables(@PathVariable String database) throws IOException {
         List<String> results = null;
 
         try {
-            results = hiveClient.getHiveTableNames(database);
+            results = tableService.getHiveTableNames(database);
         } catch (Exception e) {
-            e.printStackTrace();
             throw new IOException(e);
         }
         return results;
     }
 
-    public void setCubeService(CubeService cubeService) {
-        this.cubeMgmtService = cubeService;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 85c9284..23aa5a4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -25,7 +25,6 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.WeakHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -41,17 +40,10 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -63,9 +55,6 @@ import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.security.AclPermission;
-import org.apache.kylin.source.hive.HiveSourceTableLoader;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.slf4j.Logger;
@@ -438,53 +427,6 @@ public class CubeService extends BasicService {
         return hr;
     }
 
-    /**
-     * Generate cardinality for table This will trigger a hadoop job
-     * The result will be merged into table exd info
-     *
-     * @param tableName
-     */
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinality(String tableName, String submitter) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        TableDesc table = getMetadataManager().getTableDesc(tableName);
-        final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
-        if (table == null) {
-            IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
-            logger.error("Cannot find table descirptor " + tableName, e);
-            throw e;
-        }
-
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        //make sure the job could be scheduled when the DistributedScheduler is enable.
-        job.setParam("segmentId", tableName);
-        job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
-        job.setSubmitter(submitter);
-
-        String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
-        String param = "-table " + tableName + " -output " + outPath;
-
-        MapReduceExecutable step1 = new MapReduceExecutable();
-
-        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
-        step1.setMapReduceParams(param);
-        step1.setParam("segmentId", tableName);
-
-        job.addTask(step1);
-
-        HadoopShellExecutable step2 = new HadoopShellExecutable();
-
-        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
-        step2.setJobParams(param);
-        step2.setParam("segmentId", tableName);
-        job.addTask(step2);
-        tableExt.setJodID(job.getId());
-        getMetadataManager().saveTableExt(tableExt);
-
-        getExecutableManager().addJob(job);
-    }
-
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION')  or hasPermission(#cube, 'MANAGEMENT')")
     public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) throws IOException {
         CubeDesc desc = cube.getDescriptor();
@@ -546,44 +488,6 @@ public class CubeService extends BasicService {
         CubeManager.getInstance(getConfig()).updateCube(update);
     }
 
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public String[] reloadHiveTable(String tables) throws IOException {
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig());
-        return (String[]) loaded.toArray(new String[loaded.size()]);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void unLoadHiveTable(String tableName) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void syncTableToProject(String[] tables, String project) throws IOException {
-        getProjectManager().addTableDescToProject(tables, project);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void removeTableFromProject(String tableName, String projectName) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        getProjectManager().removeTableDescFromProject(tableName, projectName);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
-        ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
-        for (String table : tables) {
-            TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
-            String jobID = tableExtDesc.getJodID();
-            if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
-                calculateCardinality(table, submitter);
-            }
-        }
-    }
-
     public void updateOnNewSegmentReady(String cubeName) {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         String serverMode = kylinConfig.getServerMode();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
new file mode 100644
index 0000000..6accfdc
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.response.TableDescResponse;
+import org.apache.kylin.source.hive.HiveClientFactory;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.apache.kylin.source.hive.IHiveClient;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.stereotype.Component;
+
+@Component("tableService")
+public class TableService extends BasicService {
+
+    private static final Logger logger = LoggerFactory.getLogger(TableService.class);
+
+    @Autowired
+    private ModelService modelService;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private StreamingService streamingService;
+
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException {
+        List<TableDesc> tables = getProjectManager().listDefinedTables(project);
+        if (null == tables) {
+            return Collections.emptyList();
+        }
+        if (withExt) {
+            tables = cloneTableDesc(tables);
+        }
+        return tables;
+    }
+
+    public TableDesc getTableDescByName(String tableName) {
+        return getMetadataManager().getTableDesc(tableName);
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public String[] loadHiveTablesToProject(String[] tables, String project) throws IOException {
+        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, getConfig());
+        String[] result = (String[]) loaded.toArray(new String[loaded.size()]);
+        syncTableToProject(result, project);
+        return result;
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void unLoadHiveTable(String tableName) throws IOException {
+        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+        tableName = dbTableName[0] + "." + dbTableName[1];
+        HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void syncTableToProject(String[] tables, String project) throws IOException {
+        getProjectManager().addTableDescToProject(tables, project);
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void removeTableFromProject(String tableName, String projectName) throws IOException {
+        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+        tableName = dbTableName[0] + "." + dbTableName[1];
+        getProjectManager().removeTableDescFromProject(tableName, projectName);
+    }
+
+    /**
+     * table may referenced by several projects, and kylin only keep one copy of meta for each table,
+     * that's why we have two if statement here.
+     * @param tableName
+     * @param project
+     * @return
+     */
+    public boolean unLoadHiveTable(String tableName, String project) {
+        boolean rtn = false;
+        int tableType = 0;
+
+        //remove streaming info
+        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+        tableName = dbTableName[0] + "." + dbTableName[1];
+        TableDesc desc = getTableDescByName(tableName);
+        if (desc == null)
+            return false;
+        tableType = desc.getSourceType();
+
+        try {
+            if (!modelService.isTableInModel(tableName, project)) {
+                removeTableFromProject(tableName, project);
+                rtn = true;
+            } else {
+                List<String> models = modelService.getModelsUsingTable(tableName, project);
+                throw new InternalErrorException("Table is already in use by models " + models);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+            try {
+                unLoadHiveTable(tableName);
+                rtn = true;
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                rtn = false;
+            }
+        }
+
+        if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+            StreamingConfig config = null;
+            KafkaConfig kafkaConfig = null;
+            try {
+                config = streamingService.getStreamingManager().getStreamingConfig(tableName);
+                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
+                streamingService.dropStreamingConfig(config);
+                kafkaConfigService.dropKafkaConfig(kafkaConfig);
+                rtn = true;
+            } catch (Exception e) {
+                rtn = false;
+                logger.error(e.getLocalizedMessage(), e);
+            }
+        }
+        return rtn;
+    }
+
+    /**
+     *
+     * @param desc
+     * @param project
+     * @throws IOException
+     */
+    public void addStreamingTable(TableDesc desc, String project) throws IOException {
+        desc.setUuid(UUID.randomUUID().toString());
+        getMetadataManager().saveSourceTable(desc);
+        syncTableToProject(new String[] { desc.getIdentity() }, project);
+    }
+
+    /**
+     *
+     * @return
+     * @throws Exception
+     */
+    public List<String> getHiveDbNames() throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        List<String> results = hiveClient.getHiveDbNames();
+        return results;
+    }
+
+    /**
+     *
+     * @param database
+     * @return
+     * @throws Exception
+     */
+    public List<String> getHiveTableNames(String database) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        List<String> results = hiveClient.getHiveTableNames(database);
+        return results;
+    }
+
+    private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
+        List<TableDesc> descs = new ArrayList<TableDesc>();
+        Iterator<TableDesc> it = tables.iterator();
+        while (it.hasNext()) {
+            TableDesc table = it.next();
+            TableExtDesc tableExtDesc = getMetadataManager().getTableExt(table.getIdentity());
+
+            // Clone TableDesc
+            TableDescResponse rtableDesc = new TableDescResponse(table);
+            Map<String, Long> cardinality = new HashMap<String, Long>();
+            Map<String, String> dataSourceProp = new HashMap<>();
+            String scard = tableExtDesc.getCardinality();
+            if (!StringUtils.isEmpty(scard)) {
+                String[] cards = StringUtils.split(scard, ",");
+                ColumnDesc[] cdescs = rtableDesc.getColumns();
+                for (int i = 0; i < cdescs.length; i++) {
+                    ColumnDesc columnDesc = cdescs[i];
+                    if (cards.length > i) {
+                        cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
+                    } else {
+                        logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+                        break;
+                    }
+                }
+                rtableDesc.setCardinality(cardinality);
+            }
+            dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
+            dataSourceProp.put("location", tableExtDesc.getStorageLocation());
+            dataSourceProp.put("owner", tableExtDesc.getOwner());
+            dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
+            dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
+            dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
+            rtableDesc.setDescExd(dataSourceProp);
+            descs.add(rtableDesc);
+        }
+
+        return descs;
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
+        MetadataManager metaMgr = getMetadataManager();
+        ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
+        for (String table : tables) {
+            TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
+            String jobID = tableExtDesc.getJodID();
+            if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
+                calculateCardinality(table, submitter);
+            }
+        }
+    }
+
+    /**
+     * Generate cardinality for table This will trigger a hadoop job
+     * The result will be merged into table exd info
+     *
+     * @param tableName
+     */
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public void calculateCardinality(String tableName, String submitter) throws IOException {
+        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+        tableName = dbTableName[0] + "." + dbTableName[1];
+        TableDesc table = getMetadataManager().getTableDesc(tableName);
+        final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
+        if (table == null) {
+            IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
+            logger.error("Cannot find table descirptor " + tableName, e);
+            throw e;
+        }
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        //make sure the job could be scheduled when the DistributedScheduler is enable.
+        job.setParam("segmentId", tableName);
+        job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
+        job.setSubmitter(submitter);
+
+        String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
+        String param = "-table " + tableName + " -output " + outPath;
+
+        MapReduceExecutable step1 = new MapReduceExecutable();
+
+        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+        step1.setMapReduceParams(param);
+        step1.setParam("segmentId", tableName);
+
+        job.addTask(step1);
+
+        HadoopShellExecutable step2 = new HadoopShellExecutable();
+
+        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
+        step2.setJobParams(param);
+        step2.setParam("segmentId", tableName);
+        job.addTask(step2);
+        tableExt.setJodID(job.getId());
+        getMetadataManager().saveTableExt(tableExt);
+
+        getExecutableManager().addJob(job);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/0b66ef1d/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 77e1084..b56009a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -51,7 +51,7 @@ public class HiveSourceTableLoader {
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
 
-    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+    public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
 
         SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
         for (String fullTableName : hiveTables) {