You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by bi...@apache.org on 2016/12/29 10:48:50 UTC
kylin git commit: KYLIN-2323 Refactor table load/unload
Repository: kylin
Updated Branches:
refs/heads/master 6ea03b863 -> e2e2a81c1
KYLIN-2323 Refactor table load/unload
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e2e2a81c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e2e2a81c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e2e2a81c
Branch: refs/heads/master
Commit: e2e2a81c1f1d7e3f9af5c81bb4f1ad14d2d1b859
Parents: 6ea03b8
Author: Billy Liu <bi...@apache.org>
Authored: Thu Dec 29 18:48:38 2016 +0800
Committer: Billy Liu <bi...@apache.org>
Committed: Thu Dec 29 18:48:38 2016 +0800
----------------------------------------------------------------------
.../hive/ITHiveSourceTableLoaderTest.java | 2 +-
.../rest/controller/StreamingController.java | 22 +-
.../kylin/rest/controller/TableController.java | 268 ++++------------
.../apache/kylin/rest/service/CubeService.java | 96 ------
.../apache/kylin/rest/service/TableService.java | 318 +++++++++++++++++++
.../source/hive/HiveSourceTableLoader.java | 2 +-
webapp/app/js/controllers/sourceMeta.js | 16 +-
webapp/app/js/services/tables.js | 1 -
8 files changed, 391 insertions(+), 334 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
index c4f0777..7aff3ba 100644
--- a/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/source/hive/ITHiveSourceTableLoaderTest.java
@@ -45,7 +45,7 @@ public class ITHiveSourceTableLoaderTest extends HBaseMetadataTestCase {
public void test() throws IOException {
KylinConfig config = getTestConfig();
String[] toLoad = new String[] { "DEFAULT.TEST_KYLIN_FACT", "EDW.TEST_CAL_DT" };
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(toLoad, config);
+ Set<String> loaded = HiveSourceTableLoader.loadHiveTables(toLoad, config);
assertTrue(loaded.size() == toLoad.length);
for (String str : toLoad)
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
index e04ebc8..0ced9ad 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingController.java
@@ -23,10 +23,8 @@ import java.util.List;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.BadRequestException;
@@ -34,9 +32,9 @@ import org.apache.kylin.rest.exception.ForbiddenException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.service.CubeService;
import org.apache.kylin.rest.service.KafkaConfigService;
import org.apache.kylin.rest.service.StreamingService;
+import org.apache.kylin.rest.service.TableService;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +67,7 @@ public class StreamingController extends BasicController {
@Autowired
private KafkaConfigService kafkaConfigService;
@Autowired
- private CubeService cubeMgmtService;
+ private TableService tableService;
@RequestMapping(value = "/getConfig", method = { RequestMethod.GET })
@ResponseBody
@@ -113,10 +111,7 @@ public class StreamingController extends BasicController {
boolean saveStreamingSuccess = false, saveKafkaSuccess = false;
try {
- tableDesc.setUuid(UUID.randomUUID().toString());
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- metaMgr.saveSourceTable(tableDesc);
- cubeMgmtService.syncTableToProject(new String[] { tableDesc.getIdentity() }, project);
+ tableService.addStreamingTable(tableDesc, project);
} catch (IOException e) {
throw new BadRequestException("Failed to add streaming table.");
}
@@ -289,15 +284,4 @@ public class StreamingController extends BasicController {
request.setMessage(message);
}
- public void setStreamingService(StreamingService streamingService) {
- this.streamingService = streamingService;
- }
-
- public void setKafkaConfigService(KafkaConfigService kafkaConfigService) {
- this.kafkaConfigService = kafkaConfigService;
- }
-
- public void setCubeService(CubeService cubeService) {
- this.cubeMgmtService = cubeService;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 74d1b28..eed5413 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -19,37 +19,18 @@
package org.apache.kylin.rest.controller;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.CardinalityRequest;
import org.apache.kylin.rest.request.HiveTableRequest;
-import org.apache.kylin.rest.request.StreamingRequest;
-import org.apache.kylin.rest.response.TableDescResponse;
-import org.apache.kylin.rest.service.CubeService;
-import org.apache.kylin.rest.service.KafkaConfigService;
-import org.apache.kylin.rest.service.ModelService;
-import org.apache.kylin.rest.service.ProjectService;
-import org.apache.kylin.rest.service.StreamingService;
-import org.apache.kylin.source.hive.HiveClientFactory;
-import org.apache.kylin.source.hive.IHiveClient;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.rest.service.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -70,44 +51,27 @@ import com.google.common.collect.Sets;
@Controller
@RequestMapping(value = "/tables")
public class TableController extends BasicController {
+
private static final Logger logger = LoggerFactory.getLogger(TableController.class);
@Autowired
- private CubeService cubeMgmtService;
- @Autowired
- private ProjectService projectService;
- @Autowired
- private StreamingService streamingService;
- @Autowired
- private KafkaConfigService kafkaConfigService;
- @Autowired
- private ModelService modelService;
+ private TableService tableService;
/**
- * Get available table list of the input database
+ * Get available table list of the project
*
* @return Table metadata array
* @throws IOException
*/
@RequestMapping(value = "", method = { RequestMethod.GET })
@ResponseBody
- public List<TableDesc> getHiveTables(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
- long start = System.currentTimeMillis();
- List<TableDesc> tables = null;
+ public List<TableDesc> getTableDesc(@RequestParam(value = "ext", required = false) boolean withExt, @RequestParam(value = "project", required = true) String project) throws IOException {
try {
- tables = cubeMgmtService.getProjectManager().listDefinedTables(project);
- } catch (Exception e) {
- logger.error("Failed to deal with the request.", e);
+ return tableService.getTableDescByProject(project, withExt);
+ } catch (IOException e) {
+ logger.error("Failed to get Hive Tables", e);
throw new InternalErrorException(e.getLocalizedMessage());
}
-
- if (withExt) {
- tables = cloneTableDesc(tables);
- }
- long end = System.currentTimeMillis();
- logger.info("Return all table metadata in " + (end - start) + " seconds");
-
- return tables;
}
/**
@@ -118,29 +82,39 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/{tableName:.+}", method = { RequestMethod.GET })
@ResponseBody
- public TableDesc getHiveTable(@PathVariable String tableName) {
- return cubeMgmtService.getMetadataManager().getTableDesc(tableName);
- }
-
- @RequestMapping(value = "/reload", method = { RequestMethod.PUT })
- @ResponseBody
- public String reloadSourceTable() {
- cubeMgmtService.getMetadataManager().reload();
- return "ok";
+ public TableDesc getTableDesc(@PathVariable String tableName) {
+ TableDesc table = tableService.getTableDescByName(tableName, false);
+ if (table == null)
+ throw new NotFoundException("Could not find Hive table: " + tableName);
+ return table;
}
@RequestMapping(value = "/{tables}/{project}", method = { RequestMethod.POST })
@ResponseBody
- public Map<String, String[]> loadHiveTable(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
+ public Map<String, String[]> loadHiveTables(@PathVariable String tables, @PathVariable String project, @RequestBody HiveTableRequest request) throws IOException {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
- String[] loaded = cubeMgmtService.reloadHiveTable(tables);
- if (request.isCalculate()) {
- cubeMgmtService.calculateCardinalityIfNotPresent(loaded, submitter);
- }
- cubeMgmtService.syncTableToProject(loaded, project);
Map<String, String[]> result = new HashMap<String, String[]>();
- result.put("result.loaded", loaded);
- result.put("result.unloaded", new String[] {});
+ String[] tableNames = tables.split(",");
+ try {
+ String[] loaded = tableService.loadHiveTablesToProject(tableNames, project);
+ result.put("result.loaded", loaded);
+ Set<String> allTables = new HashSet<String>();
+ for (String tableName : tableNames) {
+ allTables.add(tableService.normalizeHiveTableName(tableName));
+ }
+ for (String loadedTableName : loaded) {
+ allTables.remove(loadedTableName);
+ }
+ String[] unloaded = new String[allTables.size()];
+ allTables.toArray(unloaded);
+ result.put("result.unloaded", unloaded);
+ if (request.isCalculate()) {
+ tableService.calculateCardinalityIfNotPresent(loaded, submitter);
+ }
+ } catch (Exception e) {
+ logger.error("Failed to load Hive Table", e);
+ throw new InternalErrorException(e.getLocalizedMessage());
+ }
return result;
}
@@ -150,12 +124,17 @@ public class TableController extends BasicController {
Set<String> unLoadSuccess = Sets.newHashSet();
Set<String> unLoadFail = Sets.newHashSet();
Map<String, String[]> result = new HashMap<String, String[]>();
- for (String tableName : tables.split(",")) {
- if (unLoadHiveTable(tableName, project)) {
- unLoadSuccess.add(tableName);
- } else {
- unLoadFail.add(tableName);
+ try {
+ for (String tableName : tables.split(",")) {
+ if (tableService.unLoadHiveTable(tableName, project)) {
+ unLoadSuccess.add(tableName);
+ } else {
+ unLoadFail.add(tableName);
+ }
}
+ } catch (Exception e) {
+ logger.error("Failed to unload Hive Table", e);
+ throw new InternalErrorException(e.getLocalizedMessage());
}
result.put("result.unload.success", (String[]) unLoadSuccess.toArray(new String[unLoadSuccess.size()]));
result.put("result.unload.fail", (String[]) unLoadFail.toArray(new String[unLoadFail.size()]));
@@ -163,77 +142,6 @@ public class TableController extends BasicController {
}
/**
- * table may referenced by several projects, and kylin only keep one copy of meta for each table,
- * that's why we have two if statement here.
- * @param tableName
- * @param project
- * @return
- */
- private boolean unLoadHiveTable(String tableName, String project) {
- boolean rtn = false;
- int tableType = 0;
-
- //remove streaming info
- String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
- tableName = dbTableName[0] + "." + dbTableName[1];
- TableDesc desc = cubeMgmtService.getMetadataManager().getTableDesc(tableName);
- if (desc == null)
- return false;
- tableType = desc.getSourceType();
-
- try {
- if (!modelService.isTableInModel(tableName, project)) {
- cubeMgmtService.removeTableFromProject(tableName, project);
- rtn = true;
- } else {
- List<String> models = modelService.getModelsUsingTable(tableName, project);
- throw new InternalErrorException("Table is already in use by models " + models);
- }
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- }
- if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
- try {
- cubeMgmtService.unLoadHiveTable(tableName);
- rtn = true;
- } catch (IOException e) {
- logger.error(e.getMessage(), e);
- rtn = false;
- }
- }
-
- if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
- StreamingConfig config = null;
- KafkaConfig kafkaConfig = null;
- try {
- config = streamingService.getStreamingManager().getStreamingConfig(tableName);
- kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
- streamingService.dropStreamingConfig(config);
- kafkaConfigService.dropKafkaConfig(kafkaConfig);
- rtn = true;
- } catch (Exception e) {
- rtn = false;
- logger.error(e.getLocalizedMessage(), e);
- }
- }
- return rtn;
- }
-
- @RequestMapping(value = "/addStreamingSrc", method = { RequestMethod.POST })
- @ResponseBody
- public Map<String, String> addStreamingTable(@RequestBody StreamingRequest request) throws IOException {
- Map<String, String> result = new HashMap<String, String>();
- String project = request.getProject();
- TableDesc desc = JsonUtil.readValue(request.getTableData(), TableDesc.class);
- desc.setUuid(UUID.randomUUID().toString());
- MetadataManager metaMgr = MetadataManager.getInstance(KylinConfig.getInstanceFromEnv());
- metaMgr.saveSourceTable(desc);
- cubeMgmtService.syncTableToProject(new String[] { desc.getName() }, project);
- result.put("success", "true");
- return result;
- }
-
- /**
* Regenerate table cardinality
*
* @return Table metadata array
@@ -244,57 +152,15 @@ public class TableController extends BasicController {
public CardinalityRequest generateCardinality(@PathVariable String tableNames, @RequestBody CardinalityRequest request) throws IOException {
String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
String[] tables = tableNames.split(",");
- for (String table : tables) {
- cubeMgmtService.calculateCardinality(table.trim().toUpperCase(), submitter);
- }
- return request;
- }
-
- /**
- * @param tables
- * @return
- */
- private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
- if (null == tables) {
- return Collections.emptyList();
- }
-
- List<TableDesc> descs = new ArrayList<TableDesc>();
- Iterator<TableDesc> it = tables.iterator();
- while (it.hasNext()) {
- TableDesc table = it.next();
- TableExtDesc tableExtDesc = cubeMgmtService.getMetadataManager().getTableExt(table.getIdentity());
-
- // Clone TableDesc
- TableDescResponse rtableDesc = new TableDescResponse(table);
- Map<String, Long> cardinality = new HashMap<String, Long>();
- Map<String, String> dataSourceProp = new HashMap<>();
- String scard = tableExtDesc.getCardinality();
- if (!StringUtils.isEmpty(scard)) {
- String[] cards = StringUtils.split(scard, ",");
- ColumnDesc[] cdescs = rtableDesc.getColumns();
- for (int i = 0; i < cdescs.length; i++) {
- ColumnDesc columnDesc = cdescs[i];
- if (cards.length > i) {
- cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
- } else {
- logger.error("The result cardinality is not identical with hive table metadata, cardinaly : " + scard + " column array length: " + cdescs.length);
- break;
- }
- }
- rtableDesc.setCardinality(cardinality);
+ try {
+ for (String table : tables) {
+ tableService.calculateCardinality(table.trim().toUpperCase(), submitter);
}
- dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
- dataSourceProp.put("location", tableExtDesc.getStorageLocation());
- dataSourceProp.put("owner", tableExtDesc.getOwner());
- dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
- dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
- dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
- rtableDesc.setDescExd(dataSourceProp);
- descs.add(rtableDesc);
+ } catch (IOException e) {
+ logger.error("Failed to calculate cardinality", e);
+ throw new InternalErrorException(e.getLocalizedMessage());
}
-
- return descs;
+ return request;
}
/**
@@ -305,17 +171,12 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive", method = { RequestMethod.GET })
@ResponseBody
- private static List<String> showHiveDatabases() throws IOException {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
- List<String> results = null;
-
+ private List<String> showHiveDatabases() throws IOException {
try {
- results = hiveClient.getHiveDbNames();
+ return tableService.getHiveDbNames();
} catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
+ throw new InternalErrorException(e.getLocalizedMessage());
}
- return results;
}
/**
@@ -326,21 +187,12 @@ public class TableController extends BasicController {
*/
@RequestMapping(value = "/hive/{database}", method = { RequestMethod.GET })
@ResponseBody
- private static List<String> showHiveTables(@PathVariable String database) throws IOException {
- IHiveClient hiveClient = HiveClientFactory.getHiveClient();
- List<String> results = null;
-
+ private List<String> showHiveTables(@PathVariable String database) throws IOException {
try {
- results = hiveClient.getHiveTableNames(database);
+ return tableService.getHiveTableNames(database);
} catch (Exception e) {
- e.printStackTrace();
- throw new IOException(e);
+ throw new InternalErrorException(e.getLocalizedMessage());
}
- return results;
- }
-
- public void setCubeService(CubeService cubeService) {
- this.cubeMgmtService = cubeService;
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 85c9284..23aa5a4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -25,7 +25,6 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.WeakHashMap;
import org.apache.commons.io.IOUtils;
@@ -41,17 +40,10 @@ import org.apache.kylin.cube.cuboid.CuboidCLI;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
-import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.project.ProjectInstance;
import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
@@ -63,9 +55,6 @@ import org.apache.kylin.rest.request.MetricsRequest;
import org.apache.kylin.rest.response.HBaseResponse;
import org.apache.kylin.rest.response.MetricsResponse;
import org.apache.kylin.rest.security.AclPermission;
-import org.apache.kylin.source.hive.HiveSourceTableLoader;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
-import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator;
import org.slf4j.Logger;
@@ -438,53 +427,6 @@ public class CubeService extends BasicService {
return hr;
}
- /**
- * Generate cardinality for table This will trigger a hadoop job
- * The result will be merged into table exd info
- *
- * @param tableName
- */
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
- public void calculateCardinality(String tableName, String submitter) throws IOException {
- String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
- tableName = dbTableName[0] + "." + dbTableName[1];
- TableDesc table = getMetadataManager().getTableDesc(tableName);
- final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
- if (table == null) {
- IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
- logger.error("Cannot find table descirptor " + tableName, e);
- throw e;
- }
-
- DefaultChainedExecutable job = new DefaultChainedExecutable();
- //make sure the job could be scheduled when the DistributedScheduler is enable.
- job.setParam("segmentId", tableName);
- job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
- job.setSubmitter(submitter);
-
- String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
- String param = "-table " + tableName + " -output " + outPath;
-
- MapReduceExecutable step1 = new MapReduceExecutable();
-
- step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
- step1.setMapReduceParams(param);
- step1.setParam("segmentId", tableName);
-
- job.addTask(step1);
-
- HadoopShellExecutable step2 = new HadoopShellExecutable();
-
- step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
- step2.setJobParams(param);
- step2.setParam("segmentId", tableName);
- job.addTask(step2);
- tableExt.setJodID(job.getId());
- getMetadataManager().saveTableExt(tableExt);
-
- getExecutableManager().addJob(job);
- }
-
@PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
public void updateCubeNotifyList(CubeInstance cube, List<String> notifyList) throws IOException {
CubeDesc desc = cube.getDescriptor();
@@ -546,44 +488,6 @@ public class CubeService extends BasicService {
CubeManager.getInstance(getConfig()).updateCube(update);
}
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
- public String[] reloadHiveTable(String tables) throws IOException {
- Set<String> loaded = HiveSourceTableLoader.reloadHiveTables(tables.split(","), getConfig());
- return (String[]) loaded.toArray(new String[loaded.size()]);
- }
-
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
- public void unLoadHiveTable(String tableName) throws IOException {
- String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
- tableName = dbTableName[0] + "." + dbTableName[1];
- HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
- }
-
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
- public void syncTableToProject(String[] tables, String project) throws IOException {
- getProjectManager().addTableDescToProject(tables, project);
- }
-
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
- public void removeTableFromProject(String tableName, String projectName) throws IOException {
- String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
- tableName = dbTableName[0] + "." + dbTableName[1];
- getProjectManager().removeTableDescFromProject(tableName, projectName);
- }
-
- @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
- public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
- MetadataManager metaMgr = getMetadataManager();
- ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
- for (String table : tables) {
- TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
- String jobID = tableExtDesc.getJodID();
- if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
- calculateCardinality(table, submitter);
- }
- }
- }
-
public void updateOnNewSegmentReady(String cubeName) {
final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
String serverMode = kylinConfig.getServerMode();
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
new file mode 100644
index 0000000..461800e
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.streaming.StreamingConfig;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.exception.InternalErrorException;
+import org.apache.kylin.rest.response.TableDescResponse;
+import org.apache.kylin.source.hive.HiveClientFactory;
+import org.apache.kylin.source.hive.HiveSourceTableLoader;
+import org.apache.kylin.source.hive.IHiveClient;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
+import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityUpdateJob;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.stereotype.Component;
+
+@Component("tableService")
+public class TableService extends BasicService {
+
+ private static final Logger logger = LoggerFactory.getLogger(TableService.class);
+
+ @Autowired
+ private ModelService modelService;
+
+ @Autowired
+ private ProjectService projectService;
+
+ @Autowired
+ private StreamingService streamingService;
+
+ @Autowired
+ private KafkaConfigService kafkaConfigService;
+
+ public List<TableDesc> getTableDescByProject(String project, boolean withExt) throws IOException {
+ List<TableDesc> tables = getProjectManager().listDefinedTables(project);
+ if (null == tables) {
+ return Collections.emptyList();
+ }
+ if (withExt) {
+ tables = cloneTableDesc(tables);
+ }
+ return tables;
+ }
+
+ public TableDesc getTableDescByName(String tableName, boolean withExt) {
+ TableDesc table = getMetadataManager().getTableDesc(tableName);
+ if(withExt){
+ table = cloneTableDesc(table);
+ }
+ return table;
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+ public String[] loadHiveTablesToProject(String[] tables, String project) throws IOException {
+ Set<String> loaded = HiveSourceTableLoader.loadHiveTables(tables, getConfig());
+ String[] result = (String[]) loaded.toArray(new String[loaded.size()]);
+ syncTableToProject(result, project);
+ return result;
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+ private void unLoadHiveTable(String tableName) throws IOException {
+ tableName = normalizeHiveTableName(tableName);
+ HiveSourceTableLoader.unLoadHiveTable(tableName.toUpperCase());
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+ private void syncTableToProject(String[] tables, String project) throws IOException {
+ getProjectManager().addTableDescToProject(tables, project);
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
+ private void removeTableFromProject(String tableName, String projectName) throws IOException {
+ tableName = normalizeHiveTableName(tableName);
+ getProjectManager().removeTableDescFromProject(tableName, projectName);
+ }
+
+ /**
+ * table may referenced by several projects, and kylin only keep one copy of meta for each table,
+ * that's why we have two if statement here.
+ * @param tableName
+ * @param project
+ * @return
+ */
+ public boolean unLoadHiveTable(String tableName, String project) {
+ boolean rtn = false;
+ int tableType = 0;
+
+ //remove streaming info
+ tableName = normalizeHiveTableName(tableName);
+ TableDesc desc = getMetadataManager().getTableDesc(tableName);
+ if (desc == null)
+ return false;
+ tableType = desc.getSourceType();
+
+ try {
+ if (!modelService.isTableInModel(tableName, project)) {
+ removeTableFromProject(tableName, project);
+ rtn = true;
+ } else {
+ List<String> models = modelService.getModelsUsingTable(tableName, project);
+ throw new InternalErrorException("Table is already in use by models " + models);
+ }
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ }
+ if (!projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+ try {
+ unLoadHiveTable(tableName);
+ rtn = true;
+ } catch (IOException e) {
+ logger.error(e.getMessage(), e);
+ rtn = false;
+ }
+ }
+
+ if (tableType == 1 && !projectService.isTableInAnyProject(tableName) && !modelService.isTableInAnyModel(tableName)) {
+ StreamingConfig config = null;
+ KafkaConfig kafkaConfig = null;
+ try {
+ config = streamingService.getStreamingManager().getStreamingConfig(tableName);
+ kafkaConfig = kafkaConfigService.getKafkaConfig(tableName);
+ streamingService.dropStreamingConfig(config);
+ kafkaConfigService.dropKafkaConfig(kafkaConfig);
+ rtn = true;
+ } catch (Exception e) {
+ rtn = false;
+ logger.error(e.getLocalizedMessage(), e);
+ }
+ }
+ return rtn;
+ }
+
+ /**
+ *
+ * @param desc
+ * @param project
+ * @throws IOException
+ */
+ public void addStreamingTable(TableDesc desc, String project) throws IOException {
+ desc.setUuid(UUID.randomUUID().toString());
+ getMetadataManager().saveSourceTable(desc);
+ syncTableToProject(new String[] { desc.getIdentity() }, project);
+ }
+
+ /**
+ *
+ * @return
+ * @throws Exception
+ */
+ public List<String> getHiveDbNames() throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ List<String> results = hiveClient.getHiveDbNames();
+ return results;
+ }
+
+ /**
+ *
+ * @param database
+ * @return
+ * @throws Exception
+ */
+ public List<String> getHiveTableNames(String database) throws Exception {
+ IHiveClient hiveClient = HiveClientFactory.getHiveClient();
+ List<String> results = hiveClient.getHiveTableNames(database);
+ return results;
+ }
+
+ private TableDescResponse cloneTableDesc(TableDesc table) {
+ TableExtDesc tableExtDesc = getMetadataManager().getTableExt(table.getIdentity());
+
+ // Clone TableDesc
+ TableDescResponse rtableDesc = new TableDescResponse(table);
+ Map<String, Long> cardinality = new HashMap<String, Long>();
+ Map<String, String> dataSourceProp = new HashMap<>();
+ String scard = tableExtDesc.getCardinality();
+ if (!StringUtils.isEmpty(scard)) {
+ String[] cards = StringUtils.split(scard, ",");
+ ColumnDesc[] cdescs = rtableDesc.getColumns();
+ for (int i = 0; i < cdescs.length; i++) {
+ ColumnDesc columnDesc = cdescs[i];
+ if (cards.length > i) {
+ cardinality.put(columnDesc.getName(), Long.parseLong(cards[i]));
+ } else {
+ logger.error("The result cardinality is not identical with hive table metadata, cardinality : " + scard + " column array length: " + cdescs.length);
+ break;
+ }
+ }
+ rtableDesc.setCardinality(cardinality);
+ }
+ dataSourceProp.putAll(tableExtDesc.getDataSourceProp());
+ dataSourceProp.put("location", tableExtDesc.getStorageLocation());
+ dataSourceProp.put("owner", tableExtDesc.getOwner());
+ dataSourceProp.put("last_access_time", tableExtDesc.getLastAccessTime());
+ dataSourceProp.put("partition_column", tableExtDesc.getPartitionColumn());
+ dataSourceProp.put("total_file_size", tableExtDesc.getTotalFileSize());
+ rtableDesc.setDescExd(dataSourceProp);
+ return rtableDesc;
+ }
+
+
+ private List<TableDesc> cloneTableDesc(List<TableDesc> tables) throws IOException {
+ List<TableDesc> descs = new ArrayList<TableDesc>();
+ Iterator<TableDesc> it = tables.iterator();
+ while (it.hasNext()) {
+ TableDesc table = it.next();
+ TableDescResponse rtableDesc = cloneTableDesc(table);
+ descs.add(rtableDesc);
+ }
+
+ return descs;
+ }
+
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+ public void calculateCardinalityIfNotPresent(String[] tables, String submitter) throws IOException {
+ MetadataManager metaMgr = getMetadataManager();
+ ExecutableManager exeMgt = ExecutableManager.getInstance(getConfig());
+ for (String table : tables) {
+ TableExtDesc tableExtDesc = metaMgr.getTableExt(table);
+ String jobID = tableExtDesc.getJodID();
+ if (null == jobID || ExecutableState.RUNNING != exeMgt.getOutput(jobID).getState()) {
+ calculateCardinality(table, submitter);
+ }
+ }
+ }
+
+ /**
+ * Generate cardinality for table This will trigger a hadoop job
+ * The result will be merged into table exd info
+ *
+ * @param tableName
+ */
+ @PreAuthorize(Constant.ACCESS_HAS_ROLE_MODELER + " or " + Constant.ACCESS_HAS_ROLE_ADMIN)
+ public void calculateCardinality(String tableName, String submitter) throws IOException {
+ tableName = normalizeHiveTableName(tableName);
+ TableDesc table = getMetadataManager().getTableDesc(tableName);
+ final TableExtDesc tableExt = getMetadataManager().getTableExt(tableName);
+ if (table == null) {
+ IllegalArgumentException e = new IllegalArgumentException("Cannot find table descirptor " + tableName);
+ logger.error("Cannot find table descirptor " + tableName, e);
+ throw e;
+ }
+
+ DefaultChainedExecutable job = new DefaultChainedExecutable();
+ //make sure the job could be scheduled when the DistributedScheduler is enable.
+ job.setParam("segmentId", tableName);
+ job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
+ job.setSubmitter(submitter);
+
+ String outPath = getConfig().getHdfsWorkingDirectory() + "cardinality/" + job.getId() + "/" + tableName;
+ String param = "-table " + tableName + " -output " + outPath;
+
+ MapReduceExecutable step1 = new MapReduceExecutable();
+
+ step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
+ step1.setMapReduceParams(param);
+ step1.setParam("segmentId", tableName);
+
+ job.addTask(step1);
+
+ HadoopShellExecutable step2 = new HadoopShellExecutable();
+
+ step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
+ step2.setJobParams(param);
+ step2.setParam("segmentId", tableName);
+ job.addTask(step2);
+ tableExt.setJodID(job.getId());
+ getMetadataManager().saveTableExt(tableExt);
+
+ getExecutableManager().addJob(job);
+ }
+
+ public String normalizeHiveTableName(String tableName){
+ String[] dbTableName = HadoopUtil.parseHiveTableName(tableName);
+ return dbTableName[0] + "." + dbTableName[1];
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
index 77e1084..b56009a 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSourceTableLoader.java
@@ -51,7 +51,7 @@ public class HiveSourceTableLoader {
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(HiveSourceTableLoader.class);
- public static Set<String> reloadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
+ public static Set<String> loadHiveTables(String[] hiveTables, KylinConfig config) throws IOException {
SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
for (String fullTableName : hiveTables) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/controllers/sourceMeta.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/controllers/sourceMeta.js b/webapp/app/js/controllers/sourceMeta.js
index bbb9915..a53a35f 100755
--- a/webapp/app/js/controllers/sourceMeta.js
+++ b/webapp/app/js/controllers/sourceMeta.js
@@ -330,7 +330,7 @@ KylinApp
}
if ($scope.tableNames.trim() === "") {
- SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info');
+ SweetAlert.swal('', 'Please input table(s) you want to load.', 'info');
return;
}
@@ -352,13 +352,13 @@ KylinApp
})
if (result['result.unloaded'].length != 0 && result['result.loaded'].length == 0) {
- SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unloadedTableInfo, 'error');
+ SweetAlert.swal('Failed!', 'Failed to load following table(s): ' + unloadedTableInfo, 'error');
}
if (result['result.loaded'].length != 0 && result['result.unloaded'].length == 0) {
- SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo, 'success');
+ SweetAlert.swal('Success!', 'The following table(s) have been successfully loaded: ' + loadTableInfo, 'success');
}
if (result['result.loaded'].length != 0 && result['result.unloaded'].length != 0) {
- SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully synchronized: ' + loadTableInfo + "\n\n Failed to synchronize following table(s):" + unloadedTableInfo, 'warning');
+ SweetAlert.swal('Partial loaded!', 'The following table(s) have been successfully loaded: ' + loadTableInfo + "\n\n Failed to load following table(s):" + unloadedTableInfo, 'warning');
}
loadingRequest.hide();
scope.aceSrcTbLoaded(true);
@@ -378,7 +378,7 @@ KylinApp
$scope.remove = function () {
if ($scope.tableNames.trim() === "") {
- SweetAlert.swal('', 'Please input table(s) you want to synchronize.', 'info');
+ SweetAlert.swal('', 'Please input table(s) you want to unload.', 'info');
return;
}
@@ -400,13 +400,13 @@ KylinApp
})
if (result['result.unload.fail'].length != 0 && result['result.unload.success'].length == 0) {
- SweetAlert.swal('Failed!', 'Failed to synchronize following table(s): ' + unRemovedTableInfo, 'error');
+ SweetAlert.swal('Failed!', 'Failed to unload following table(s): ' + unRemovedTableInfo, 'error');
}
if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length == 0) {
- SweetAlert.swal('Success!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo, 'success');
+ SweetAlert.swal('Success!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo, 'success');
}
if (result['result.unload.success'].length != 0 && result['result.unload.fail'].length != 0) {
- SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully synchronized: ' + removedTableInfo + "\n\n Failed to synchronize following table(s):" + unRemovedTableInfo, 'warning');
+ SweetAlert.swal('Partial unloaded!', 'The following table(s) have been successfully unloaded: ' + removedTableInfo + "\n\n Failed to unload following table(s):" + unRemovedTableInfo, 'warning');
}
loadingRequest.hide();
scope.aceSrcTbLoaded(true);
http://git-wip-us.apache.org/repos/asf/kylin/blob/e2e2a81c/webapp/app/js/services/tables.js
----------------------------------------------------------------------
diff --git a/webapp/app/js/services/tables.js b/webapp/app/js/services/tables.js
index 4199d6c..4e7a7c4 100755
--- a/webapp/app/js/services/tables.js
+++ b/webapp/app/js/services/tables.js
@@ -24,7 +24,6 @@ KylinApp.factory('TableService', ['$resource', function ($resource, config) {
reload: {method: 'PUT', params: {action: 'reload'}, isArray: false},
loadHiveTable: {method: 'POST', params: {}, isArray: false},
unLoadHiveTable: {method: 'DELETE', params: {}, isArray: false},
- addStreamingSrc: {method: 'POST', params: {action:'addStreamingSrc'}, isArray: false},
genCardinality: {method: 'PUT', params: {action: 'cardinality'}, isArray: false},
showHiveDatabases: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true},
showHiveTables: {method: 'GET', params: {action:'hive'}, cache: true, isArray: true}