You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/05/21 00:27:37 UTC
[kylin] 05/05: KYLIN-3377 Some improvements for lookup table -
snapshot management
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch KYLIN-3221
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3826a3de898af0c7f5bdb20052e0c7d3bed8193c
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Thu May 10 13:13:15 2018 +0800
KYLIN-3377 Some improvements for lookup table - snapshot management
Signed-off-by: shaofengshi <sh...@apache.org>
---
.../kylin/engine/mr/LookupSnapshotBuildJob.java | 97 +++++++++++++++
.../kylin/engine/mr/LookupSnapshotJobBuilder.java | 86 +++++++++++++
.../mr/steps/lookup/LookupExecutableUtil.java | 134 +++++++++++++++++++++
.../lookup/LookupSnapshotToMetaStoreStep.java | 83 +++++++++++++
.../steps/lookup/UpdateCubeAfterSnapshotStep.java | 78 ++++++++++++
.../kylin/rest/controller/CubeController.java | 20 +++
.../kylin/rest/controller/TableController.java | 28 +++++
.../rest/request/LookupSnapshotBuildRequest.java | 51 ++++++++
.../kylin/rest/response/TableSnapshotResponse.java | 98 +++++++++++++++
.../org/apache/kylin/rest/service/JobService.java | 32 +++++
.../apache/kylin/rest/service/TableService.java | 116 ++++++++++++++++++
.../kylin/rest/service/TableServiceTest.java | 53 ++++++++
12 files changed, 876 insertions(+)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
new file mode 100644
index 0000000..6865ce3
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotBuildJob.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.project.ProjectInstance;
+import org.apache.kylin.metadata.project.ProjectManager;
+
+public class LookupSnapshotBuildJob extends DefaultChainedExecutable {
+
+ public static final Integer DEFAULT_PRIORITY = 30;
+
+ private static final String DEPLOY_ENV_NAME = "envName";
+ private static final String PROJECT_INSTANCE_NAME = "projectName";
+ private static final String CUBE_NAME = "cubeName";
+
+ private static final String JOB_TYPE = "Lookup ";
+
+ public static LookupSnapshotBuildJob createJob(CubeInstance cube, String tableName, String submitter,
+ KylinConfig kylinConfig) {
+ return initJob(cube, tableName, submitter, kylinConfig);
+ }
+
+ private static LookupSnapshotBuildJob initJob(CubeInstance cube, String tableName, String submitter,
+ KylinConfig kylinConfig) {
+ List<ProjectInstance> projList = ProjectManager.getInstance(kylinConfig).findProjects(cube.getType(),
+ cube.getName());
+ if (projList == null || projList.size() == 0) {
+ throw new RuntimeException("Cannot find the project containing the cube " + cube.getName() + "!!!");
+ } else if (projList.size() >= 2) {
+ String msg = "Find more than one project containing the cube " + cube.getName()
+ + ". It does't meet the uniqueness requirement!!! ";
+ throw new RuntimeException(msg);
+ }
+
+ LookupSnapshotBuildJob result = new LookupSnapshotBuildJob();
+ SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
+ format.setTimeZone(TimeZone.getTimeZone(kylinConfig.getTimeZone()));
+ result.setDeployEnvName(kylinConfig.getDeployEnv());
+ result.setProjectName(projList.get(0).getName());
+ CubingExecutableUtil.setCubeName(cube.getName(), result.getParams());
+ result.setName(JOB_TYPE + " CUBE - " + cube.getName() + " - " + " TABLE - " + tableName + " - "
+ + format.format(new Date(System.currentTimeMillis())));
+ result.setSubmitter(submitter);
+ result.setNotifyList(cube.getDescriptor().getNotifyList());
+ return result;
+ }
+
+ protected void setDeployEnvName(String name) {
+ setParam(DEPLOY_ENV_NAME, name);
+ }
+
+ public String getDeployEnvName() {
+ return getParam(DEPLOY_ENV_NAME);
+ }
+
+ public String getProjectName() {
+ return getParam(PROJECT_INSTANCE_NAME);
+ }
+
+ public void setProjectName(String name) {
+ setParam(PROJECT_INSTANCE_NAME, name);
+ }
+
+ public String getCubeName() {
+ return getParam(CUBE_NAME);
+ }
+
+ @Override
+ public int getDefaultPriority() {
+ return DEFAULT_PRIORITY;
+ }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
new file mode 100644
index 0000000..e7888a5
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/LookupSnapshotJobBuilder.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr;
+
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.SnapshotTableDesc;
+import org.apache.kylin.engine.mr.steps.lookup.LookupExecutableUtil;
+import org.apache.kylin.engine.mr.steps.lookup.LookupSnapshotToMetaStoreStep;
+import org.apache.kylin.engine.mr.steps.lookup.UpdateCubeAfterSnapshotStep;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LookupSnapshotJobBuilder {
+ private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotJobBuilder.class);
+ private CubeInstance cube;
+ private String lookupTable;
+ private List<String> segments;
+ private String submitter;
+ private KylinConfig kylinConfig;
+
+ public LookupSnapshotJobBuilder(CubeInstance cube, String lookupTable, List<String> segments, String submitter) {
+ this.cube = cube;
+ this.lookupTable = lookupTable;
+ this.segments = segments;
+ this.submitter = submitter;
+ this.kylinConfig = cube.getConfig();
+ }
+
+ public LookupSnapshotBuildJob build() {
+ logger.info("new job to build lookup snapshot:{} for cube:{}", lookupTable, cube.getName());
+ LookupSnapshotBuildJob result = LookupSnapshotBuildJob.createJob(cube, lookupTable, submitter, kylinConfig);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ SnapshotTableDesc snapshotTableDesc = cubeDesc.getSnapshotTableDesc(lookupTable);
+ if (snapshotTableDesc != null && snapshotTableDesc.isExtSnapshotTable()) {
+ addExtMaterializeLookupTableSteps(result, snapshotTableDesc);
+ } else {
+ addInMetaStoreMaterializeLookupTableSteps(result);
+ }
+ return result;
+ }
+
+ private void addExtMaterializeLookupTableSteps(final LookupSnapshotBuildJob result,
+ SnapshotTableDesc snapshotTableDesc) {
+ ILookupMaterializer materializer = MRUtil.getExtLookupMaterializer(snapshotTableDesc.getStorageType());
+ materializer.materializeLookupTable(result, cube, lookupTable);
+
+ UpdateCubeAfterSnapshotStep afterSnapshotStep = new UpdateCubeAfterSnapshotStep();
+ afterSnapshotStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_UPDATE_CUBE);
+ LookupExecutableUtil.setCubeName(cube.getName(), afterSnapshotStep.getParams());
+ LookupExecutableUtil.setLookupTableName(lookupTable, afterSnapshotStep.getParams());
+ LookupExecutableUtil.setSegments(segments, afterSnapshotStep.getParams());
+ LookupExecutableUtil.setJobID(result.getId(), afterSnapshotStep.getParams());
+ result.addTask(afterSnapshotStep);
+ }
+
+ private void addInMetaStoreMaterializeLookupTableSteps(final LookupSnapshotBuildJob result) {
+ LookupSnapshotToMetaStoreStep lookupSnapshotToMetaStoreStep = new LookupSnapshotToMetaStoreStep();
+ lookupSnapshotToMetaStoreStep.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_LOOKUP_TABLE_META_STORE);
+ LookupExecutableUtil.setCubeName(cube.getName(), lookupSnapshotToMetaStoreStep.getParams());
+ LookupExecutableUtil.setLookupTableName(lookupTable, lookupSnapshotToMetaStoreStep.getParams());
+ LookupExecutableUtil.setSegments(segments, lookupSnapshotToMetaStoreStep.getParams());
+ result.addTask(lookupSnapshotToMetaStoreStep);
+ }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
new file mode 100644
index 0000000..bc2aa1d
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupExecutableUtil.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+
+import com.google.common.collect.Lists;
+
+public class LookupExecutableUtil {
+
+ public static final String CUBE_NAME = "cubeName";
+ public static final String LOOKUP_TABLE_NAME = "lookupTableName";
+ public static final String PROJECT_NAME = "projectName";
+ public static final String LOOKUP_SNAPSHOT_ID = "snapshotID";
+ public static final String SEGMENT_IDS = "segments";
+ public static final String JOB_ID = "jobID";
+
+
+ public static void setCubeName(String cubeName, Map<String, String> params) {
+ params.put(CUBE_NAME, cubeName);
+ }
+
+ public static String getCubeName(Map<String, String> params) {
+ return params.get(CUBE_NAME);
+ }
+
+ public static void setLookupTableName(String lookupTableName, Map<String, String> params) {
+ params.put(LOOKUP_TABLE_NAME, lookupTableName);
+ }
+
+ public static String getLookupTableName(Map<String, String> params) {
+ return params.get(LOOKUP_TABLE_NAME);
+ }
+
+ public static void setProjectName(String projectName, Map<String, String> params) {
+ params.put(PROJECT_NAME, projectName);
+ }
+
+ public static String getProjectName(Map<String, String> params) {
+ return params.get(PROJECT_NAME);
+ }
+
+ public static void setLookupSnapshotID(String snapshotID, Map<String, String> params) {
+ params.put(LOOKUP_SNAPSHOT_ID, snapshotID);
+ }
+
+ public static String getLookupSnapshotID(Map<String, String> params) {
+ return params.get(LOOKUP_SNAPSHOT_ID);
+ }
+
+ public static List<String> getSegments(Map<String, String> params) {
+ final String ids = params.get(SEGMENT_IDS);
+ if (ids != null) {
+ final String[] splitted = StringUtils.split(ids, ",");
+ ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+ for (String id : splitted) {
+ result.add(id);
+ }
+ return result;
+ } else {
+ return Collections.emptyList();
+ }
+ }
+
+ public static void setSegments(List<String> segments, Map<String, String> params) {
+ params.put(SEGMENT_IDS, StringUtils.join(segments, ","));
+ }
+
+
+ public static String getJobID(Map<String, String> params) {
+ return params.get(JOB_ID);
+ }
+
+ public static void setJobID(String jobID, Map<String, String> params) {
+ params.put(JOB_ID, jobID);
+ }
+
+ public static void updateSnapshotPathToCube(CubeManager cubeManager, CubeInstance cube, String lookupTableName,
+ String snapshotPath) throws IOException {
+ cubeManager.updateCubeLookupSnapshot(cube, lookupTableName, snapshotPath);
+ cube.putSnapshotResPath(lookupTableName, snapshotPath);
+ }
+
+ public static void updateSnapshotPathToSegments(CubeManager cubeManager, CubeInstance cube, List<String> segmentIDs, String lookupTableName, String snapshotPath) throws IOException {
+ CubeInstance cubeCopy = cube.latestCopyForWrite();
+ if (segmentIDs.size() > 0) {
+ CubeSegment[] segments = new CubeSegment[segmentIDs.size()];
+ for (int i = 0; i < segments.length; i++) {
+ CubeSegment segment = cubeCopy.getSegmentById(segmentIDs.get(i));
+ if (segment == null) {
+ throw new IllegalStateException("the segment not exist in cube:" + segmentIDs.get(i));
+ }
+ segment.putSnapshotResPath(lookupTableName, snapshotPath);
+ segments[i] = segment;
+ }
+ CubeUpdate cubeUpdate = new CubeUpdate(cubeCopy);
+ cubeUpdate.setToUpdateSegs(segments);
+ cubeManager.updateCube(cubeUpdate);
+
+ // Update the input cubeSeg after the resource store updated
+ for (int i = 0; i < segments.length; i++) {
+ CubeSegment segment = cube.getSegmentById(segmentIDs.get(i));
+ segment.putSnapshotResPath(lookupTableName, snapshotPath);
+ }
+ }
+ }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
new file mode 100644
index 0000000..783ded0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/LookupSnapshotToMetaStoreStep.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.SourceManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class LookupSnapshotToMetaStoreStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(LookupSnapshotToMetaStoreStep.class);
+
+ public LookupSnapshotToMetaStoreStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig kylinConfig = context.getConfig();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+ TableMetadataManager metaMgr = TableMetadataManager.getInstance(kylinConfig);
+ SnapshotManager snapshotMgr = SnapshotManager.getInstance(kylinConfig);
+ CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+ List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+ String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+ CubeDesc cubeDesc = cube.getDescriptor();
+ try {
+ TableDesc tableDesc = metaMgr.getTableDesc(lookupTableName, cube.getProject());
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ logger.info("take snapshot for table:" + lookupTableName);
+ SnapshotTable snapshot = snapshotMgr.buildSnapshot(hiveTable, tableDesc);
+
+ logger.info("update snapshot path to cube metadata");
+ if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+ LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+ snapshot.getResourcePath());
+ } else {
+ LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+ snapshot.getResourcePath());
+ }
+ return new ExecuteResult();
+ } catch (IOException e) {
+ logger.error("fail to build snapshot for:" + lookupTableName, e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
new file mode 100644
index 0000000..463e3b9
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/lookup/UpdateCubeAfterSnapshotStep.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.lookup;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Save lookup snapshot information to cube metadata
+ */
+public class UpdateCubeAfterSnapshotStep extends AbstractExecutable {
+
+ private static final Logger logger = LoggerFactory.getLogger(UpdateCubeAfterSnapshotStep.class);
+
+ public UpdateCubeAfterSnapshotStep() {
+ super();
+ }
+
+ @Override
+ protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+ KylinConfig kylinConfig = context.getConfig();
+ CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
+
+ CubeInstance cube = cubeManager.getCube(LookupExecutableUtil.getCubeName(this.getParams()));
+ List<String> segmentIDs = LookupExecutableUtil.getSegments(this.getParams());
+ String lookupTableName = LookupExecutableUtil.getLookupTableName(this.getParams());
+ DefaultChainedExecutable job = (DefaultChainedExecutable) getManager().getJob(LookupExecutableUtil.getJobID(this.getParams()));
+
+ String contextKey = BatchConstants.LOOKUP_EXT_SNAPSHOT_CONTEXT_PFX + lookupTableName;
+ String snapshotResPath = job.getExtraInfo(contextKey);
+
+ CubeDesc cubeDesc = cube.getDescriptor();
+ try {
+ logger.info("update snapshot path to cube metadata");
+ if (cubeDesc.isGlobalSnapshotTable(lookupTableName)) {
+ LookupExecutableUtil.updateSnapshotPathToCube(cubeManager, cube, lookupTableName,
+ snapshotResPath);
+ } else {
+ LookupExecutableUtil.updateSnapshotPathToSegments(cubeManager, cube, segmentIDs, lookupTableName,
+ snapshotResPath);
+ }
+ return new ExecuteResult();
+ } catch (IOException e) {
+ logger.error("fail to save cuboid statistics", e);
+ return ExecuteResult.createError(e);
+ }
+ }
+
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
index b4ebcba..c6219a4 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/CubeController.java
@@ -64,6 +64,7 @@ import org.apache.kylin.rest.request.JobBuildRequest2;
import org.apache.kylin.rest.request.JobOptimizeRequest;
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.CubeInstanceResponse;
+import org.apache.kylin.rest.request.LookupSnapshotBuildRequest;
import org.apache.kylin.rest.response.CuboidTreeResponse;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.GeneralResponse;
@@ -299,6 +300,25 @@ public class CubeController extends BasicController {
}
/**
+ * Force rebuild a cube's lookup table snapshot
+ *
+ * @throws IOException
+ */
+ @RequestMapping(value = "/{cubeName}/refresh_lookup", method = { RequestMethod.PUT }, produces = { "application/json" })
+ @ResponseBody
+ public JobInstance reBuildLookupSnapshot(@PathVariable String cubeName, @RequestBody LookupSnapshotBuildRequest request) {
+ try {
+ final CubeManager cubeMgr = cubeService.getCubeManager();
+ final CubeInstance cube = cubeMgr.getCube(cubeName);
+ String submitter = SecurityContextHolder.getContext().getAuthentication().getName();
+ return jobService.submitLookupSnapshotJob(cube, request.getLookupTableName(), request.getSegmentIDs(), submitter);
+ } catch (IOException e) {
+ logger.error(e.getLocalizedMessage(), e);
+ throw new InternalErrorException(e.getLocalizedMessage());
+ }
+ }
+
+ /**
* Delete a cube segment
*
* @throws IOException
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
index 7ada8cc..66621c7 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/TableController.java
@@ -32,6 +32,7 @@ import org.apache.kylin.rest.exception.NotFoundException;
import org.apache.kylin.rest.request.CardinalityRequest;
import org.apache.kylin.rest.request.HiveTableRequest;
import org.apache.kylin.rest.service.TableACLService;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
import org.apache.kylin.rest.service.TableService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -208,4 +209,31 @@ public class TableController extends BasicController {
}
}
+ @RequestMapping(value = "/{project}/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.PUT })
+ @ResponseBody
+ public void updateSnapshotLocalCache(@PathVariable final String project, @PathVariable final String tableName, @PathVariable final String snapshotID) {
+ tableService.updateSnapshotLocalCache(project, tableName, snapshotID);
+ }
+
+ @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache/state", method = { RequestMethod.GET })
+ @ResponseBody
+ public String getSnapshotLocalCacheState(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+ return tableService.getSnapshotLocalCacheState(tableName, snapshotID);
+ }
+
+ @RequestMapping(value = "/{tableName}/{snapshotID}/snapshotLocalCache", method = { RequestMethod.DELETE })
+ @ResponseBody
+ public void removeSnapshotLocalCache(@PathVariable final String tableName, @PathVariable final String snapshotID) {
+ tableService.removeSnapshotLocalCache(tableName, snapshotID);
+ }
+
+ @RequestMapping(value = "/{project}/{tableName}/snapshots", method = { RequestMethod.GET })
+ @ResponseBody
+ public List<TableSnapshotResponse> getTableSnapshots(@PathVariable final String project, @PathVariable final String tableName) throws IOException {
+ return tableService.getLookupTableSnapshots(project, tableName);
+ }
+
+ public void setTableService(TableService tableService) {
+ this.tableService = tableService;
+ }
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
new file mode 100644
index 0000000..06adf8a
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/LookupSnapshotBuildRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.request;
+
+import java.util.List;
+
+public class LookupSnapshotBuildRequest {
+ private String cubeName;
+ private String lookupTableName;
+ private List<String> segmentIDs;
+
+ public String getCubeName() {
+ return cubeName;
+ }
+
+ public void setCubeName(String cubeName) {
+ this.cubeName = cubeName;
+ }
+
+ public String getLookupTableName() {
+ return lookupTableName;
+ }
+
+ public void setLookupTableName(String lookupTableName) {
+ this.lookupTableName = lookupTableName;
+ }
+
+ public List<String> getSegmentIDs() {
+ return segmentIDs;
+ }
+
+ public void setSegmentIDs(List<String> segmentIDs) {
+ this.segmentIDs = segmentIDs;
+ }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
new file mode 100644
index 0000000..0574764
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/response/TableSnapshotResponse.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.response;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class TableSnapshotResponse implements Serializable {
+ private static final long serialVersionUID = -8707176301793624704L;
+ public static final String TYPE_EXT = "ext";
+ public static final String TYPE_INNER = "inner";
+
+ private String snapshotID;
+
+ private String snapshotType; // can be ext or inner
+
+ private String storageType;
+
+ private long lastBuildTime;
+
+ private long sourceTableSize;
+
+ private long sourceTableLastModifyTime;
+
+ private List<String> cubesAndSegmentsUsage;
+
+ public String getSnapshotID() {
+ return snapshotID;
+ }
+
+ public void setSnapshotID(String snapshotID) {
+ this.snapshotID = snapshotID;
+ }
+
+ public String getSnapshotType() {
+ return snapshotType;
+ }
+
+ public void setSnapshotType(String snapshotType) {
+ this.snapshotType = snapshotType;
+ }
+
+ public String getStorageType() {
+ return storageType;
+ }
+
+ public void setStorageType(String storageType) {
+ this.storageType = storageType;
+ }
+
+ public long getLastBuildTime() {
+ return lastBuildTime;
+ }
+
+ public void setLastBuildTime(long lastBuildTime) {
+ this.lastBuildTime = lastBuildTime;
+ }
+
+ public long getSourceTableSize() {
+ return sourceTableSize;
+ }
+
+ public void setSourceTableSize(long sourceTableSize) {
+ this.sourceTableSize = sourceTableSize;
+ }
+
+ public long getSourceTableLastModifyTime() {
+ return sourceTableLastModifyTime;
+ }
+
+ public void setSourceTableLastModifyTime(long sourceTableLastModifyTime) {
+ this.sourceTableLastModifyTime = sourceTableLastModifyTime;
+ }
+
+ public List<String> getCubesAndSegmentsUsage() {
+ return cubesAndSegmentsUsage;
+ }
+
+ public void setCubesAndSegmentsUsage(List<String> cubesAndSegmentsUsage) {
+ this.cubesAndSegmentsUsage = cubesAndSegmentsUsage;
+ }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 4317ed5..c0d9e56 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -36,6 +36,8 @@ import org.apache.kylin.cube.model.CubeBuildTypeEnum;
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.BatchOptimizeJobCheckpointBuilder;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.LookupSnapshotBuildJob;
+import org.apache.kylin.engine.mr.LookupSnapshotJobBuilder;
import org.apache.kylin.engine.mr.common.JobInfoConverter;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JobInstance;
@@ -385,6 +387,14 @@ public class JobService extends BasicService implements InitializingBean {
return optimizeJobInstance;
}
+ public JobInstance submitLookupSnapshotJob(CubeInstance cube, String lookupTable, List<String> segmentIDs, String submitter) throws IOException {
+ LookupSnapshotBuildJob job = new LookupSnapshotJobBuilder(cube, lookupTable, segmentIDs, submitter).build();
+ getExecutableManager().addJob(job);
+
+ JobInstance jobInstance = getLookupSnapshotBuildJobInstance(job);
+ return jobInstance;
+ }
+
private void checkCubeDescSignature(CubeInstance cube) {
Message msg = MsgPicker.getMsg();
@@ -480,6 +490,28 @@ public class JobService extends BasicService implements InitializingBean {
return result;
}
+ protected JobInstance getLookupSnapshotBuildJobInstance(LookupSnapshotBuildJob job) {
+ if (job == null) {
+ return null;
+ }
+
+ final JobInstance result = new JobInstance();
+ result.setName(job.getName());
+ result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams()));
+ result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams()));
+ result.setLastModified(job.getLastModified());
+ result.setSubmitter(job.getSubmitter());
+ result.setUuid(job.getId());
+ result.setType(CubeBuildTypeEnum.BUILD);
+ result.setStatus(JobInfoConverter.parseToJobStatus(job.getStatus()));
+ result.setDuration(job.getDuration() / 1000);
+ for (int i = 0; i < job.getTasks().size(); ++i) {
+ AbstractExecutable task = job.getTasks().get(i);
+ result.addStep(JobInfoConverter.parseToJobStep(task, i, getExecutableManager().getOutput(task.getId())));
+ }
+ return result;
+ }
+
protected JobInstance getCheckpointJobInstance(AbstractExecutable job) {
Message msg = MsgPicker.getMsg();
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index ace1686..786daa6 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@ -20,6 +20,7 @@ package org.apache.kylin.rest.service;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,10 +30,19 @@ import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfo;
+import org.apache.kylin.dict.lookup.ExtTableSnapshotInfoManager;
+import org.apache.kylin.dict.lookup.IExtLookupTableCache.CacheState;
+import org.apache.kylin.dict.lookup.LookupProviderFactory;
+import org.apache.kylin.dict.lookup.SnapshotManager;
+import org.apache.kylin.dict.lookup.SnapshotTable;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -49,6 +59,9 @@ import org.apache.kylin.rest.msg.Message;
import org.apache.kylin.rest.msg.MsgPicker;
import org.apache.kylin.rest.response.TableDescResponse;
import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable;
+import org.apache.kylin.source.IReadableTable.TableSignature;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourceManager;
import org.apache.kylin.source.hive.cardinality.HiveColumnCardinalityJob;
@@ -387,6 +400,109 @@ public class TableService extends BasicService {
}
}
+ public void updateSnapshotLocalCache(String project, String tableName, String snapshotID) {
+ ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+ ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+ TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+ if (extTableSnapshotInfo == null) {
+ throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+ }
+ LookupProviderFactory.rebuildLocalCache(tableDesc, extTableSnapshotInfo);
+ }
+
+ public void removeSnapshotLocalCache(String tableName, String snapshotID) {
+ ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+ ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+ if (extTableSnapshotInfo == null) {
+ throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+ }
+ LookupProviderFactory.removeLocalCache(extTableSnapshotInfo);
+ }
+
+ public String getSnapshotLocalCacheState(String tableName, String snapshotID) {
+ ExtTableSnapshotInfoManager snapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+ ExtTableSnapshotInfo extTableSnapshotInfo = snapshotInfoManager.getSnapshot(tableName, snapshotID);
+ if (extTableSnapshotInfo == null) {
+ throw new IllegalArgumentException("cannot find ext snapshot info for table:" + tableName + " snapshot:" + snapshotID);
+ }
+ CacheState cacheState = LookupProviderFactory.getCacheState(extTableSnapshotInfo);
+ return cacheState.name();
+ }
+
+ public List<TableSnapshotResponse> getLookupTableSnapshots(String project, String tableName) throws IOException {
+ TableDesc tableDesc = getTableManager().getTableDesc(tableName, project);
+ IReadableTable hiveTable = SourceManager.createReadableTable(tableDesc);
+ TableSignature signature = hiveTable.getSignature();
+ return internalGetLookupTableSnapshots(tableName, signature);
+ }
+
+ List<TableSnapshotResponse> internalGetLookupTableSnapshots(String tableName, TableSignature signature) throws IOException {
+ ExtTableSnapshotInfoManager extSnapshotInfoManager = ExtTableSnapshotInfoManager.getInstance(getConfig());
+ SnapshotManager snapshotManager = SnapshotManager.getInstance(getConfig());
+ List<ExtTableSnapshotInfo> extTableSnapshots = extSnapshotInfoManager.getSnapshots(tableName);
+ List<SnapshotTable> metaStoreTableSnapshots = snapshotManager.getSnapshots(tableName, signature);
+
+ Map<String, List<String>> snapshotUsageMap = getSnapshotUsages();
+
+ List<TableSnapshotResponse> result = Lists.newArrayList();
+ for (ExtTableSnapshotInfo extTableSnapshot : extTableSnapshots) {
+ TableSnapshotResponse response = new TableSnapshotResponse();
+ response.setSnapshotID(extTableSnapshot.getId());
+ response.setSnapshotType(TableSnapshotResponse.TYPE_EXT);
+ response.setLastBuildTime(extTableSnapshot.getLastBuildTime());
+ response.setStorageType(extTableSnapshot.getStorageType());
+ response.setSourceTableSize(extTableSnapshot.getSignature().getSize());
+ response.setSourceTableLastModifyTime(extTableSnapshot.getSignature().getLastModifiedTime());
+ response.setCubesAndSegmentsUsage(snapshotUsageMap.get(extTableSnapshot.getResourcePath()));
+ result.add(response);
+ }
+
+ for (SnapshotTable metaStoreTableSnapshot : metaStoreTableSnapshots) {
+ TableSnapshotResponse response = new TableSnapshotResponse();
+ response.setSnapshotID(metaStoreTableSnapshot.getId());
+ response.setSnapshotType(TableSnapshotResponse.TYPE_INNER);
+ response.setLastBuildTime(metaStoreTableSnapshot.getLastBuildTime());
+ response.setStorageType(SnapshotTable.STORAGE_TYPE_METASTORE);
+ response.setSourceTableSize(metaStoreTableSnapshot.getSignature().getSize());
+ response.setSourceTableLastModifyTime(metaStoreTableSnapshot.getSignature().getLastModifiedTime());
+ response.setCubesAndSegmentsUsage(snapshotUsageMap.get(metaStoreTableSnapshot.getResourcePath()));
+ result.add(response);
+ }
+
+ return result;
+ }
+
+ /**
+ * @return Map of SnapshotID, CubeName or SegmentName list
+ */
+ private Map<String, List<String>> getSnapshotUsages() {
+ CubeManager cubeManager = CubeManager.getInstance(getConfig());
+ Map<String, List<String>> snapshotCubeSegmentMap = Maps.newHashMap();
+ for (CubeInstance cube : cubeManager.listAllCubes()) {
+ Collection<String> cubeSnapshots = cube.getSnapshots().values();
+ for (String cubeSnapshot : cubeSnapshots) {
+ List<String> usages = snapshotCubeSegmentMap.get(cubeSnapshot);
+ if (usages == null) {
+ usages = Lists.newArrayList();
+ snapshotCubeSegmentMap.put(cubeSnapshot, usages);
+ }
+ usages.add(cube.getName());
+ }
+ for (CubeSegment segment : cube.getSegments()) {
+ Collection<String> segmentSnapshots = segment.getSnapshotPaths();
+ for (String segmentSnapshot : segmentSnapshots) {
+ List<String> usages = snapshotCubeSegmentMap.get(segmentSnapshot);
+ if (usages == null) {
+ usages = Lists.newArrayList();
+ snapshotCubeSegmentMap.put(segmentSnapshot, usages);
+ }
+ usages.add(cube.getName() + ":" + segment.getName());
+ }
+ }
+ }
+ return snapshotCubeSegmentMap;
+ }
+
/**
* Generate cardinality for table This will trigger a hadoop job
* The result will be merged into table exd info
diff --git a/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
new file mode 100644
index 0000000..86d34ac
--- /dev/null
+++ b/server-base/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.service;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.rest.response.TableSnapshotResponse;
+import org.apache.kylin.source.IReadableTable.TableSignature;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+public class TableServiceTest extends LocalFileMetadataTestCase {
+ private TableService tableService;
+
+ @Before
+ public void setUp() {
+ this.createTestMetadata();
+ tableService = new TableService();
+ }
+
+ @After
+ public void after() throws Exception {
+ this.cleanupTestMetadata();
+ }
+
+ @Test
+ public void testGetTableSnapshots() throws IOException {
+ TableSignature tableSignature = new TableSignature("TEST_CAL_DT.csv", 100, System.currentTimeMillis());
+ List<TableSnapshotResponse> snapshotResponseList = tableService.internalGetLookupTableSnapshots("EDW.TEST_CAL_DT", tableSignature);
+ Assert.assertEquals(8, snapshotResponseList.size());
+ }
+}
--
To stop receiving notification emails like this one, please contact
shaofengshi@apache.org.