You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/12/30 08:57:28 UTC

[39/50] [abbrv] kylin git commit: KYLIN-2323 Refactor table load/unload

KYLIN-2323 Refactor table load/unload


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e2e2a81c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e2e2a81c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e2e2a81c

Branch: refs/heads/master-cdh5.7
Commit: e2e2a81c1f1d7e3f9af5c81bb4f1ad14d2d1b859
Parents: 6ea03b8
Author: Billy Liu <bi...@apache.org>
Authored: Thu Dec 29 18:48:38 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Thu Dec 29 18:48:38 2016 +0800

----------------------------------------------------------------------
 .../hive/ITHiveSourceTableLoaderTest.java       |   2 +-
 .../rest/controller/StreamingController.java    |  22 +-
 .../kylin/rest/controller/TableController.java  | 268 ++++------------
 .../apache/kylin/rest/service/CubeService.java  |  96 ------
 .../apache/kylin/rest/service/TableService.java | 318 +++++++++++++++++++
 .../source/hive/HiveSourceTableLoader.java      |   2 +-
 webapp/app/js/controllers/sourceMeta.js         |  16 +-
 webapp/app/js/services/tables.js                |   1 -
 8 files changed, 391 insertions(+), 334 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index c4f0777..7aff3ba 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
     public void test() throws IOException {
         KylinConfig config = getTestConfig();
         String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, config);
 
         assertTrue(loaded.size() == toLoad.length);
         for (String str : toLoad)

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e04ebc8..0ced9ad 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -23,10 +23,8 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.BadRequestException;
@@ -34,9 +32,9 @@ import org.apache.kylin.rest.exception.ForbiddenException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.service.CubeService;
 import org.apache.kylin.rest.service.KafkaConfigService;
 import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.rest.service.TableService;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,7 +67,7 @@ public class StreamingController extends BasicController {
     @Autowired
     private KafkaConfigService kafkaConfigService;
     @Autowired
-    private CubeService cubeMgmtService;
+    private TableService tableService;
 
     @RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
     @ResponseBody
@@ -113,10 +111,7 @@ public class StreamingController extends BasicController {
         boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
 
         try {
-            tableDesc.setUuid(UUID.randomUUID().toString());
-            MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-            metaMgr.saveSourceTable(tableDesc);
-            cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project);
+            tableService.addStreamingTable(tableDesc, project);
         } catch (IOException e) {
             throw new BadRequestException("Failed to add streaming table.");
         }
@@ -289,15 +284,4 @@ public class StreamingController extends BasicController {
         request.setMessage(message);
     }
 
-    public void setStreamingService(StreamingService streamingService) {
-        this.streamingService = streamingService;
-    }
-
-    public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
-        this.kafkaConfigService = kafkaConfigService;
-    }
-
-    public void setCubeService(CubeService cubeService) {
-        this.cubeMgmtService = cubeService;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 74d1b28..eed5413 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -19,37 +19,18 @@
 package org.apache.kylin.rest.controller;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
 import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.exception.NotFoundException;
 import org.apache.kylin.rest.request.CardinalityRequest;
 import org.apache.kylin.rest.request.HiveTableRequest;
-import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.response.TableDescResponse;
-import org.apache.kylin.rest.service.CubeService;
-import org.apache.kylin.rest.service.KafkaConfigService;
-import org.apache.kylin.rest.service.ModelService;
-import org.apache.kylin.rest.service.ProjectService;
-import org.apache.kylin.rest.service.StreamingService;
-import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.source.hive.IHiveClient;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.rest.service.TableService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -70,44 +51,27 @@ import com.google.common.collect.Sets;
 @Controller
 @RequestMapping(value = "/tables")
 public class TableController extends BasicController {
+
     private static final Logger logger = LoggerFactory.getLogger(TableController.class);
 
     @Autowired
-    private CubeService cubeMgmtService;
-    @Autowired
-    private ProjectService projectService;
-    @Autowired
-    private StreamingService streamingService;
-    @Autowired
-    private KafkaConfigService kafkaConfigService;
-    @Autowired
-    private ModelService modelService;
+    private TableService tableService;
 
     /**
-     * Get available table list of the input database
+     * Get available table list of the project
      *
      * @return Table metadata array
      * @throws IOException
      */
     @RequestMapping(value = "", method = { RequestMethod.GET })
     @ResponseBody
-    public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
-        long start = System.currentTimeMillis();
-        List<TableDesc> tables = null;
+    public List<TableDesc> getTableDesc(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
         try {
-            tables = cubeMgmtService.getProjectManager().listDefinedTables(project);
-        } catch (Exception e) {
-            logger.error("Failed to deal with the request.", e);
+            return tableService.getTableDescByProject(project, withExt);
+        } catch (IOException e) {
+            logger.error("Failed to get Hive Tables", e);
             throw new InternalErrorException(e.getLocalizedMessage());
         }
-
-        if (withExt) {
-            tables = cloneTableDesc(tables);
-        }
-        long end = System.currentTimeMillis();
-        logger.info("Return all table metadata in " + (end - start) + " seconds");
-
-        return tables;
     }
 
     /**
@@ -118,29 +82,39 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/{tableName:.+}", method = { RequestMethod.GET })
     @ResponseBody
-    public TableDesc getHiveTable(@PathVariable String tableName) {
-        return cubeMgmtService.getMetadataManager().getTableDesc(tableName);
-    }
-
-    @RequestMapping(value = "/reload", method = { RequestMethod.PUT })
-    @ResponseBody
-    public String reloadSourceTable() {
-        cubeMgmtService.getMetadataManager().reload();
-        return "ok";
+    public TableDesc getTableDesc(@PathVariable String tableName) {
+        TableDesc table = tableService.getTableDescByName(tableName, false);
+        if (table == null)
+            throw new NotFoundException("Could not find Hive table: " + tableName);
+        return table;
     }
 
     @RequestMapping(value = "/{tables}/{project}", method = { RequestMethod.POST })
     @ResponseBody
-    public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
+    public Map<String, String[]> loadHiveTables(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
-        String[] loaded = cubeMgmtService.reloadHiveTable(tables);
-        if (request.isCalculate()) {
-            cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter);
-        }
-        cubeMgmtService.syncTableToProject(loaded, project);
         Map<String, String[]> result = new HashMap<String, String[]>();
-        result.put("result.loaded", loaded);
-        result.put("result.unloaded", new String[] {});
+        String[] tableNames = tables.split(",");
+        try {
+            String[] loaded = tableService.loadHiveTablesToProject(tableNames, project);
+            result.put("result.loaded", loaded);
+            Set<String> allTables = new HashSet<String>();
+            for (String tableName : tableNames) {
+                allTables.add(tableService.normalizeHiveTableName(tableName));
+            }
+            for (String loadedTableName : loaded) {
+                allTables.remove(loadedTableName);
+            }
+            String[] unloaded = new String[allTables.size()];
+            allTables.toArray(unloaded);
+            result.put("result.unloaded", unloaded);
+            if (request.isCalculate()) {
+                tableService.calculateCardinalityIfNotPresent(loaded, submitter);
+            }
+        } catch (Exception e) {
+            logger.error("Failed to load Hive Table", e);
+            throw new InternalErrorException(e.getLocalizedMessage());
+        }
         return result;
     }
 
@@ -150,12 +124,17 @@ public class TableController extends BasicController {
         Set<String> unLoadSuccess = Sets.newHashSet();
         Set<String> unLoadFail = Sets.newHashSet();
         Map<String, String[]> result = new HashMap<String, String[]>();
-        for (String tableName : tables.split(",")) {
-            if (unLoadHiveTable(tableName, project)) {
-                unLoadSuccess.add(tableName);
-            } else {
-                unLoadFail.add(tableName);
+        try {
+            for (String tableName : tables.split(",")) {
+                if (tableService.unLoadHiveTable(tableName, project)) {
+                    unLoadSuccess.add(tableName);
+                } else {
+                    unLoadFail.add(tableName);
+                }
             }
+        } catch (Exception e) {
+            logger.error("Failed to unload Hive Table", e);
+            throw new InternalErrorException(e.getLocalizedMessage());
         }
         result.put("result.unload.success", (String[]) unLoadSuccess.toArray(new String[unLoadSuccess.size()]));
         result.put("result.unload.fail", (String[]) unLoadFail.toArray(new String[unLoadFail.size()]));
@@ -163,77 +142,6 @@ public class TableController extends BasicController {
     }
 
     /**
-     * table may referenced by several projects, and kylin only keep one copy of meta for each table,
-     * that's why we have two if statement here.
-     * @param tableName
-     * @param project
-     * @return
-     */
-    private boolean unLoadHiveTable(String tableName, String project) {
-        boolean rtn = false;
-        int tableType = 0;
-
-        //remove streaming info
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        TableDesc desc = cubeMgmtService.getMetadataManager().getTableDesc(tableName);
-        if (desc == null)
-            return false;
-        tableType = desc.getSourceType();
-
-        try {
-            if (!modelService.isTableInModel(tableName, project)) {
-                cubeMgmtService.removeTableFromProject(tableName, project);
-                rtn = true;
-            } else {
-                List<String> models = modelService.getModelsUsingTable(tableName, project);
-                throw new InternalErrorException("Table is already in use by models " + models);
-            }
-        } catch (IOException e) {
-            logger.error(e.getMessage(), e);
-        }
-        if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
-            try {
-                cubeMgmtService.unLoadHiveTable(tableName);
-                rtn = true;
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-                rtn = false;
-            }
-        }
-
-        if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
-            StreamingConfig config = null;
-            KafkaConfig kafkaConfig = null;
-            try {
-                config = streamingService.getStreamingManager().getStreamingConfig(tableName);
-                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
-                streamingService.dropStreamingConfig(config);
-                kafkaConfigService.dropKafkaConfig(kafkaConfig);
-                rtn = true;
-            } catch (Exception e) {
-                rtn = false;
-                logger.error(e.getLocalizedMessage(), e);
-            }
-        }
-        return rtn;
-    }
-
-    @RequestMapping(value = "/addStreamingSrc", method = { RequestMethod.POST })
-    @ResponseBody
-    public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
-        Map<String, String> result = new HashMap<String, String>();
-        String project = request.getProject();
-        TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class);
-        desc.setUuid(UUID.randomUUID().toString());
-        MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
-        metaMgr.saveSourceTable(desc);
-        cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project);
-        result.put("success", "true");
-        return result;
-    }
-
-    /**
      * Regenerate table cardinality
      *
      * @return Table metadata array
@@ -244,57 +152,15 @@ public class TableController extends BasicController {
     public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws IOException {
         String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
         String[] tables = tableNames.split(",");
-        for (String table : tables) {
-            cubeMgmtService.calculateCardinality(table.trim().toUpperCase(), submitter);
-        }
-        return request;
-    }
-
-    /**
-     * @param tables
-     * @return
-     */
-    private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
-        if (null == tables) {
-            return Collections.emptyList();
-        }
-
-        List<TableDesc> descs = new ArrayList<TableDesc>();
-        Iterator<TableDesc> it = tables.iterator();
-        while (it.hasNext()) {
-            TableDesc table = it.next();
-            TableExtDesc tableExtDesc = cubeMgmtService.getMetadataManager().getTableExt(table.getIdentity());
-
-            // Clone TableDesc
-            TableDescResponse rtableDesc = new TableDescResponse(table);
-            Map<String, Long> cardinality = new HashMap<String, Long>();
-            Map<String, String> dataSourceProp = new HashMap<>();
-            String scard = tableExtDesc.getCardinality();
-            if (!StringUtils.isEmpty(scard)) {
-                String[] cards = StringUtils.split(scard, ",");
-                ColumnDesc[] cdescs = rtableDesc.getColumns();
-                for (int i = 0; i < cdescs.length; i++) {
-                    ColumnDesc columnDesc = cdescs[i];
-                    if (cards.length > i) {
-                        cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
-                    } else {
-                        logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length);
-                        break;
-                    }
-                }
-                rtableDesc.setCardinality(cardinality);
+        try {
+            for (String table : tables) {
+                tableService.calculateCardinality(table.trim().toUpperCase(), submitter);
             }
-            dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
-            dataSourceProp.put("location", tableExtDesc.getStorageLocation());
-            dataSourceProp.put("owner", tableExtDesc.getOwner());
-            dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
-            dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
-            dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
-            rtableDesc.setDescExd(dataSourceProp);
-            descs.add(rtableDesc);
+        } catch (IOException e) {
+            logger.error("Failed to calculate cardinality", e);
+            throw new InternalErrorException(e.getLocalizedMessage());
         }
-
-        return descs;
+        return request;
     }
 
     /**
@@ -305,17 +171,12 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveDatabases() throws IOException {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        List<String> results = null;
-
+    private List<String> showHiveDatabases() throws IOException {
         try {
-            results = hiveClient.getHiveDbNames();
+            return tableService.getHiveDbNames();
         } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
+            throw new InternalErrorException(e.getLocalizedMessage());
         }
-        return results;
     }
 
     /**
@@ -326,21 +187,12 @@ public class TableController extends BasicController {
      */
     @RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET })
     @ResponseBody
-    private static List<String> showHiveTables(@PathVariable String database) throws IOException {
-        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-        List<String> results = null;
-
+    private List<String> showHiveTables(@PathVariable String database) throws IOException {
         try {
-            results = hiveClient.getHiveTableNames(database);
+            return tableService.getHiveTableNames(database);
         } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException(e);
+            throw new InternalErrorException(e.getLocalizedMessage());
         }
-        return results;
-    }
-
-    public void setCubeService(CubeService cubeService) {
-        this.cubeMgmtService = cubeService;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 85c9284..23aa5a4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -25,7 +25,6 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.WeakHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -41,17 +40,10 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
@@ -63,9 +55,6 @@ import org.apache.kylin.rest.request.MetricsRequest;
 import org.apache.kylin.rest.response.HBaseResponse;
 import org.apache.kylin.rest.response.MetricsResponse;
 import org.apache.kylin.rest.security.AclPermission;
-import org.apache.kylin.source.hive.HiveSourceTableLoader;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
 import org.slf4j.Logger;
@@ -438,53 +427,6 @@ public class CubeService extends BasicService {
         return hr;
     }
 
-    /**
-     * Generate cardinality for table This will trigger a hadoop job
-     * The result will be merged into table exd info
-     *
-     * @param tableName
-     */
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinality(String tableName, String submitter) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        TableDesc table = getMetadataManager().getTableDesc(tableName);
-        final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
-        if (table == null) {
-            IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
-            logger.error("Cannot find table descirptor " + tableName, e);
-            throw e;
-        }
-
-        DefaultChainedExecutable job = new DefaultChainedExecutable();
-        //make sure the job could be scheduled when the DistributedScheduler is enable.
-        job.setParam("segmentId", tableName);
-        job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
-        job.setSubmitter(submitter);
-
-        String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
-        String param = "-table " + tableName + " -output " + outPath;
-
-        MapReduceExecutable step1 = new MapReduceExecutable();
-
-        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
-        step1.setMapReduceParams(param);
-        step1.setParam("segmentId", tableName);
-
-        job.addTask(step1);
-
-        HadoopShellExecutable step2 = new HadoopShellExecutable();
-
-        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
-        step2.setJobParams(param);
-        step2.setParam("segmentId", tableName);
-        job.addTask(step2);
-        tableExt.setJodID(job.getId());
-        getMetadataManager().saveTableExt(tableExt);
-
-        getExecutableManager().addJob(job);
-    }
-
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION')  or hasPermission(#cube, 'MANAGEMENT')")
     public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) throws IOException {
         CubeDesc desc = cube.getDescriptor();
@@ -546,44 +488,6 @@ public class CubeService extends BasicService {
         CubeManager.getInstance(getConfig()).updateCube(update);
     }
 
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public String[] reloadHiveTable(String tables) throws IOException {
-        Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig());
-        return (String[]) loaded.toArray(new String[loaded.size()]);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void unLoadHiveTable(String tableName) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void syncTableToProject(String[] tables, String project) throws IOException {
-        getProjectManager().addTableDescToProject(tables, project);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void removeTableFromProject(String tableName, String projectName) throws IOException {
-        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
-        tableName = dbTableName[0] + "." + dbTableName[1];
-        getProjectManager().removeTableDescFromProject(tableName, projectName);
-    }
-
-    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
-    public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
-        MetadataManager metaMgr = getMetadataManager();
-        ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
-        for (String table : tables) {
-            TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
-            String jobID = tableExtDesc.getJodID();
-            if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
-                calculateCardinality(table, submitter);
-            }
-        }
-    }
-
     public void updateOnNewSegmentReady(String cubeName) {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         String serverMode = kylinConfig.getServerMode();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
new file mode 100644
index 0000000..461800e
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.response.TableDescResponse;
+import org.apache.kylin.source.hive.HiveClientFactory;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.apache.kylin.source.hive.IHiveClient;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.stereotype.Component;
+
+@Component("tableService")
+public class TableService extends BasicService {
+
+    private static final Logger logger = LoggerFactory.getLogger(TableService.class);
+
+    @Autowired
+    private ModelService modelService;
+
+    @Autowired
+    private ProjectService projectService;
+
+    @Autowired
+    private StreamingService streamingService;
+
+    @Autowired
+    private KafkaConfigService kafkaConfigService;
+
+    public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException {
+        List<TableDesc> tables = getProjectManager().listDefinedTables(project);
+        if (null == tables) {
+            return Collections.emptyList();
+        }
+        if (withExt) {
+            tables = cloneTableDesc(tables);
+        }
+        return tables;
+    }
+
+    public TableDesc getTableDescByName(String tableName, boolean withExt) {
+        TableDesc table =  getMetadataManager().getTableDesc(tableName);
+        if(withExt){
+            table = cloneTableDesc(table);
+        }
+        return table;
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public String[] loadHiveTablesToProject(String[] tables, String project) throws IOException {
+        Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, getConfig());
+        String[] result = (String[]) loaded.toArray(new String[loaded.size()]);
+        syncTableToProject(result, project);
+        return result;
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void unLoadHiveTable(String tableName) throws IOException {
+        tableName = normalizeHiveTableName(tableName);
+        HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void syncTableToProject(String[] tables, String project) throws IOException {
+        getProjectManager().addTableDescToProject(tables, project);
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+    private void removeTableFromProject(String tableName, String projectName) throws IOException {
+        tableName = normalizeHiveTableName(tableName);
+        getProjectManager().removeTableDescFromProject(tableName, projectName);
+    }
+
+    /**
+     * table may referenced by several projects, and kylin only keep one copy of meta for each table,
+     * that's why we have two if statement here.
+     * @param tableName
+     * @param project
+     * @return
+     */
+    public boolean unLoadHiveTable(String tableName, String project) {
+        boolean rtn = false;
+        int tableType = 0;
+
+        //remove streaming info
+        tableName = normalizeHiveTableName(tableName);
+        TableDesc desc = getMetadataManager().getTableDesc(tableName);
+        if (desc == null)
+            return false;
+        tableType = desc.getSourceType();
+
+        try {
+            if (!modelService.isTableInModel(tableName, project)) {
+                removeTableFromProject(tableName, project);
+                rtn = true;
+            } else {
+                List<String> models = modelService.getModelsUsingTable(tableName, project);
+                throw new InternalErrorException("Table is already in use by models " + models);
+            }
+        } catch (IOException e) {
+            logger.error(e.getMessage(), e);
+        }
+        if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+            try {
+                unLoadHiveTable(tableName);
+                rtn = true;
+            } catch (IOException e) {
+                logger.error(e.getMessage(), e);
+                rtn = false;
+            }
+        }
+
+        if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+            StreamingConfig config = null;
+            KafkaConfig kafkaConfig = null;
+            try {
+                config = streamingService.getStreamingManager().getStreamingConfig(tableName);
+                kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
+                streamingService.dropStreamingConfig(config);
+                kafkaConfigService.dropKafkaConfig(kafkaConfig);
+                rtn = true;
+            } catch (Exception e) {
+                rtn = false;
+                logger.error(e.getLocalizedMessage(), e);
+            }
+        }
+        return rtn;
+    }
+
+    /**
+     *
+     * @param desc
+     * @param project
+     * @throws IOException
+     */
+    public void addStreamingTable(TableDesc desc, String project) throws IOException {
+        desc.setUuid(UUID.randomUUID().toString());
+        getMetadataManager().saveSourceTable(desc);
+        syncTableToProject(new String[] { desc.getIdentity() }, project);
+    }
+
+    /**
+     *
+     * @return
+     * @throws Exception
+     */
+    public List<String> getHiveDbNames() throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        List<String> results = hiveClient.getHiveDbNames();
+        return results;
+    }
+
+    /**
+     *
+     * @param database
+     * @return
+     * @throws Exception
+     */
+    public List<String> getHiveTableNames(String database) throws Exception {
+        IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+        List<String> results = hiveClient.getHiveTableNames(database);
+        return results;
+    }
+
+    private TableDescResponse cloneTableDesc(TableDesc table) {
+        TableExtDesc tableExtDesc = getMetadataManager().getTableExt(table.getIdentity());
+
+        // Clone TableDesc
+        TableDescResponse rtableDesc = new TableDescResponse(table);
+        Map<String, Long> cardinality = new HashMap<String, Long>();
+        Map<String, String> dataSourceProp = new HashMap<>();
+        String scard = tableExtDesc.getCardinality();
+        if (!StringUtils.isEmpty(scard)) {
+            String[] cards = StringUtils.split(scard, ",");
+            ColumnDesc[] cdescs = rtableDesc.getColumns();
+            for (int i = 0; i < cdescs.length; i++) {
+                ColumnDesc columnDesc = cdescs[i];
+                if (cards.length > i) {
+                    cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
+                } else {
+                    logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+                    break;
+                }
+            }
+            rtableDesc.setCardinality(cardinality);
+        }
+        dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
+        dataSourceProp.put("location", tableExtDesc.getStorageLocation());
+        dataSourceProp.put("owner", tableExtDesc.getOwner());
+        dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
+        dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
+        dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
+        rtableDesc.setDescExd(dataSourceProp);
+        return rtableDesc;
+    }
+
+
+    private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
+        List<TableDesc> descs = new ArrayList<TableDesc>();
+        Iterator<TableDesc> it = tables.iterator();
+        while (it.hasNext()) {
+            TableDesc table = it.next();
+            TableDescResponse rtableDesc = cloneTableDesc(table);
+            descs.add(rtableDesc);
+        }
+
+        return descs;
+    }
+
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
+        MetadataManager metaMgr = getMetadataManager();
+        ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
+        for (String table : tables) {
+            TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
+            String jobID = tableExtDesc.getJodID();
+            if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
+                calculateCardinality(table, submitter);
+            }
+        }
+    }
+
+    /**
+     * Generate cardinality for table This will trigger a hadoop job
+     * The result will be merged into table exd info
+     *
+     * @param tableName
+     */
+    @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+    public void calculateCardinality(String tableName, String submitter) throws IOException {
+        tableName = normalizeHiveTableName(tableName);
+        TableDesc table = getMetadataManager().getTableDesc(tableName);
+        final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
+        if (table == null) {
+            IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
+            logger.error("Cannot find table descirptor " + tableName, e);
+            throw e;
+        }
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        //make sure the job could be scheduled when the DistributedScheduler is enable.
+        job.setParam("segmentId", tableName);
+        job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
+        job.setSubmitter(submitter);
+
+        String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
+        String param = "-table " + tableName + " -output " + outPath;
+
+        MapReduceExecutable step1 = new MapReduceExecutable();
+
+        step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+        step1.setMapReduceParams(param);
+        step1.setParam("segmentId", tableName);
+
+        job.addTask(step1);
+
+        HadoopShellExecutable step2 = new HadoopShellExecutable();
+
+        step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
+        step2.setJobParams(param);
+        step2.setParam("segmentId", tableName);
+        job.addTask(step2);
+        tableExt.setJodID(job.getId());
+        getMetadataManager().saveTableExt(tableExt);
+
+        getExecutableManager().addJob(job);
+    }
+
+    public String normalizeHiveTableName(String tableName){
+        String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+        return dbTableName[0] + "." + dbTableName[1];
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 77e1084..b56009a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -51,7 +51,7 @@ public class HiveSourceTableLoader {
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
 
-    public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+    public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
 
         SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
         for (String fullTableName : hiveTables) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index bbb9915..a53a35f 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -330,7 +330,7 @@ KylinApp
         }
 
         if ($scope.tableNames.trim() === "") {
-          SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info');
+          SweetAlert.swal('', 'Please input table(s) you want to load.', 'info');
           return;
         }
 
@@ -352,13 +352,13 @@ KylinApp
           })
 
           if (result['result.unloaded'].length != 0 && result['result.loaded'].length == 0) {
-            SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unloadedTableInfo, 'error');
+            SweetAlert.swal('Failed!', 'Failed to load following table(s): ' + unloadedTableInfo, 'error');
           }
           if (result['result.loaded'].length != 0 && result['result.unloaded'].length == 0) {
-            SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo, 'success');
+            SweetAlert.swal('Success!', 'The following table(s) have been successfully loaded: ' + loadTableInfo, 'success');
           }
           if (result['result.loaded'].length != 0 && result['result.unloaded'].length != 0) {
-            SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo + "\n\n Failed to synchronize following table(s):" + unloadedTableInfo, 'warning');
+            SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully loaded: ' + loadTableInfo + "\n\n Failed to load following table(s):" + unloadedTableInfo, 'warning');
           }
           loadingRequest.hide();
           scope.aceSrcTbLoaded(true);
@@ -378,7 +378,7 @@ KylinApp
 
     $scope.remove = function () {
         if ($scope.tableNames.trim() === "") {
-          SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info');
+          SweetAlert.swal('', 'Please input table(s) you want to unload.', 'info');
           return;
         }
 
@@ -400,13 +400,13 @@ KylinApp
           })
 
           if (result['result.unload.fail'].length != 0 && result['result.unload.success'].length == 0) {
-            SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unRemovedTableInfo, 'error');
+            SweetAlert.swal('Failed!', 'Failed to unload following table(s): ' + unRemovedTableInfo, 'error');
           }
           if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length == 0) {
-            SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo, 'success');
+            SweetAlert.swal('Success!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo, 'success');
           }
           if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length != 0) {
-            SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo + "\n\n Failed to synchronize following table(s):" + unRemovedTableInfo, 'warning');
+            SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo + "\n\n Failed to unload following table(s):" + unRemovedTableInfo, 'warning');
           }
           loadingRequest.hide();
           scope.aceSrcTbLoaded(true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/services/tables.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
index 4199d6c..4e7a7c4 100755
--- a/webapp/app/js/services/tables.js
+++ b/webapp/app/js/services/tables.js
@@ -24,7 +24,6 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) {
     reload: {method: 'PUT', params: {action: 'reload'}, isArray: false},
     loadHiveTable: {method: 'POST', params: {}, isArray: false},
     unLoadHiveTable: {method: 'DELETE', params: {}, isArray: false},
-    addStreamingSrc: {method: 'POST', params: {action:'addStreamingSrc'}, isArray: false},
     genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false},
     showHiveDatabases: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
     showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}