You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:07 UTC
[20/50] [abbrv] incubator-kylin git commit: migration tool for job
engine
migration tool for job engine
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/504ebb5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/504ebb5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/504ebb5e
Branch: refs/heads/inverted-index
Commit: 504ebb5ed47561a571fadad407af9910b81d8141
Parents: 56572c5
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jan 22 17:35:49 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jan 22 17:35:49 2015 +0800
----------------------------------------------------------------------
.../cube/model/CubeMetadataUpgrade.java | 297 -----------
.../com/kylinolap/job/CubeMetadataUpgrade.java | 498 +++++++++++++++++++
.../job/impl/threadpool/AbstractExecutable.java | 4 +
.../threadpool/DefaultChainedExecutable.java | 1 +
.../job/service/ExecutableManager.java | 55 +-
5 files changed, 518 insertions(+), 337 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java b/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
deleted file mode 100644
index bf77a08..0000000
--- a/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
+++ /dev/null
@@ -1,297 +0,0 @@
-package com.kylinolap.cube.model;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.JsonSerializer;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.util.JsonUtil;
-import com.kylinolap.cube.CubeDescManager;
-import com.kylinolap.cube.CubeDescUpgrader;
-import com.kylinolap.metadata.MetadataConstances;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.TableDesc;
-import com.kylinolap.metadata.project.ProjectInstance;
-import com.kylinolap.metadata.project.ProjectManager;
-import com.kylinolap.metadata.project.RealizationEntry;
-import com.kylinolap.metadata.realization.RealizationType;
-
-/**
- * This is the utility class to migrate the Kylin metadata format from v1 to v2;
- *
- * @author shaoshi
- *
- */
-public class CubeMetadataUpgrade {
-
- private KylinConfig config = null;
- private ResourceStore store;
-
- private List<String> updatedResources = Lists.newArrayList();
- private List<String> errorMsgs = Lists.newArrayList();
-
- private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class);
-
- public CubeMetadataUpgrade(String newMetadataUrl) {
- KylinConfig.destoryInstance();
- System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
- KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
-
-
- config = KylinConfig.getInstanceFromEnv();
- store = getStore();
- }
-
- public void upgrade() {
-
- upgradeTableDesc();
- upgradeTableDesceExd();
- upgradeCubeDesc();
- upgradeProjectInstance();
-
- }
-
- private List<String> listResourceStore(String pathRoot) {
- List<String> paths = null;
- try {
- paths = store.collectResourceRecursively(pathRoot, MetadataConstances.FILE_SURFIX);
- } catch (IOException e1) {
- e1.printStackTrace();
- errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT);
- }
-
- return paths;
- }
-
- private void upgradeCubeDesc() {
- logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
-
- List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
- for (String path : paths) {
-
- try {
- CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
- CubeDesc ndesc = upgrade.upgrade();
- ndesc.setSignature(ndesc.calculateSignature());
-
- getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
- getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER);
- updatedResources.add(ndesc.getResourcePath());
- } catch (IOException e) {
- e.printStackTrace();
- errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage());
- }
- }
-
- }
-
- private void upgradeTableDesc() {
- List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
- for (String path : paths) {
- TableDesc t;
- try {
- t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER);
- t.init();
-
- // if it only has 1 "." in the path, delete the old resource if it exists
- if (path.substring(path.indexOf(".")).length() == MetadataConstances.FILE_SURFIX.length()) {
- getStore().deleteResource(path);
- // the new source will be new;
- t.setLastModified(0);
- getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER);
- updatedResources.add(t.getResourcePath());
- }
- } catch (IOException e) {
- e.printStackTrace();
- errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage());
- }
-
- }
-
- }
-
- private void upgradeTableDesceExd() {
-
- List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
- for (String path : paths) {
- Map<String, String> attrs = Maps.newHashMap();
-
- InputStream is = null;
- try {
- is = store.getResource(path);
- if (is == null) {
- continue;
- }
- try {
- attrs.putAll(JsonUtil.readValue(is, HashMap.class));
- } finally {
- if (is != null)
- is.close();
- }
- } catch (IOException e) {
- e.printStackTrace();
- errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
- }
-
- // parse table identity from file name
- String file = path;
- if (file.indexOf("/") > -1) {
- file = file.substring(file.lastIndexOf("/") + 1);
- }
- String tableIdentity = file.substring(0, file.length() - MetadataConstances.FILE_SURFIX.length()).toUpperCase();
-
- // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json)
- if (tableIdentity.indexOf(".") < 0) {
- tableIdentity = appendDBName(tableIdentity);
- try {
- getMetadataManager().saveTableExd(tableIdentity, attrs);
- //delete old resoruce if it exists;
- getStore().deleteResource(path);
- updatedResources.add(path);
- } catch (IOException e) {
- e.printStackTrace();
- errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
- }
-
- }
-
- }
-
- }
-
- public String appendDBName(String table) {
-
- if (table.indexOf(".") > 0)
- return table;
-
- Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap();
-
- int count = 0;
- String result = null;
- for (TableDesc t : map.values()) {
- if (t.getName().equalsIgnoreCase(table)) {
- result = t.getIdentity();
- count++;
- }
- }
-
- if (count == 1)
- return result;
-
- if (count > 1) {
- errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; The program couldn't determine, randomly pick '" + result + "'");
- }
- return result;
- }
-
- private void upgradeProjectInstance() {
- List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
- for (String path : paths) {
- try {
- com.kylinolap.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, com.kylinolap.cube.model.v1.ProjectInstance.class, new JsonSerializer<com.kylinolap.cube.model.v1.ProjectInstance>(com.kylinolap.cube.model.v1.ProjectInstance.class));
-
- ProjectInstance newPrj = new ProjectInstance();
- newPrj.setUuid(oldPrj.getUuid());
- newPrj.setName(oldPrj.getName());
- newPrj.setOwner(oldPrj.getOwner());
- newPrj.setDescription(oldPrj.getDescription());
- newPrj.setLastModified(oldPrj.getLastModified());
- newPrj.setLastUpdateTime(oldPrj.getLastUpdateTime());
- newPrj.setCreateTime(oldPrj.getCreateTime());
- newPrj.setStatus(oldPrj.getStatus());
- List<RealizationEntry> realizationEntries = Lists.newArrayList();
- for (String cube : oldPrj.getCubes()) {
- RealizationEntry entry = new RealizationEntry();
- entry.setType(RealizationType.CUBE);
- entry.setRealization(cube);
- realizationEntries.add(entry);
- }
- newPrj.setRealizationEntries(realizationEntries);
- newPrj.getCreateTimeUTC();
-
- Set<String> tables = Sets.newHashSet();
- for (String table : oldPrj.getTables()) {
- tables.add(this.appendDBName(table));
- }
- newPrj.setTables(tables);
-
- store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER);
- updatedResources.add(path);
- } catch (IOException e) {
- e.printStackTrace();
- errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage());
- }
- }
-
- }
-
- private MetadataManager getMetadataManager() {
- return MetadataManager.getInstance(config);
- }
-
- private ResourceStore getStore() {
- return ResourceStore.getStore(config);
- }
-
- public static void main(String[] args) {
-
- if (!(args != null && args.length == 1)) {
- System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder>; e.g, /export/kylin/meta");
- return;
- }
-
- String exportFolder = args[0];
-
- File oldMetaFolder = new File(exportFolder);
- if(!oldMetaFolder.exists()) {
- System.out.println("Provided folder doesn't exist: '" + exportFolder + "'");
- return;
- }
-
- if(!oldMetaFolder.isDirectory()) {
- System.out.println("Provided folder is not a directory: '" + exportFolder + "'");
- return;
- }
-
-
- String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
- try {
- FileUtils.deleteDirectory(new File(newMetadataUrl));
- FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl));
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- CubeMetadataUpgrade instance = new CubeMetadataUpgrade(newMetadataUrl);
-
- instance.upgrade();
-
- logger.info("Run CubeMetadataUpgrade completed, check the following messages.");
- logger.info("The following resources have been successfully updated in : " + newMetadataUrl);
- for (String s : instance.updatedResources) {
- logger.info(s);
- }
-
- if (instance.errorMsgs.size() > 0) {
- logger.info("Here are the error/warning messages, you may need check:");
- for (String s : instance.errorMsgs) {
- logger.warn(s);
- }
- } else {
- logger.info("No error or warning messages; Looks all good.");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java b/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
new file mode 100644
index 0000000..1a71df0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
@@ -0,0 +1,498 @@
+package com.kylinolap.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.kylinolap.cube.model.CubeDesc;
+import com.kylinolap.job.common.HadoopShellExecutable;
+import com.kylinolap.job.common.MapReduceExecutable;
+import com.kylinolap.job.common.ShellExecutable;
+import com.kylinolap.job.constant.ExecutableConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.cube.CubingJob;
+import com.kylinolap.job.execution.ExecutableState;
+import com.kylinolap.job.hadoop.cube.*;
+import com.kylinolap.job.hadoop.dict.CreateDictionaryJob;
+import com.kylinolap.job.hadoop.hbase.BulkLoadJob;
+import com.kylinolap.job.hadoop.hbase.CreateHTableJob;
+import com.kylinolap.job.impl.threadpool.AbstractExecutable;
+import com.kylinolap.job.service.ExecutableManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.JsonSerializer;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.JsonUtil;
+import com.kylinolap.cube.CubeDescManager;
+import com.kylinolap.cube.CubeDescUpgrader;
+import com.kylinolap.metadata.MetadataConstances;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.TableDesc;
+import com.kylinolap.metadata.project.ProjectInstance;
+import com.kylinolap.metadata.project.ProjectManager;
+import com.kylinolap.metadata.project.RealizationEntry;
+import com.kylinolap.metadata.realization.RealizationType;
+
+/**
+ * This is the utility class to migrate the Kylin metadata format from v1 to v2;
+ *
+ * @author shaoshi
+ *
+ */
+public class CubeMetadataUpgrade {
+
+ private KylinConfig config = null;
+ private ResourceStore store;
+
+ private List<String> updatedResources = Lists.newArrayList();
+ private List<String> errorMsgs = Lists.newArrayList();
+
+ private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class);
+
+ public CubeMetadataUpgrade(String newMetadataUrl) {
+ KylinConfig.destoryInstance();
+ System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
+ KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
+
+
+ config = KylinConfig.getInstanceFromEnv();
+ store = getStore();
+ }
+
+ public void upgrade() {
+
+// upgradeTableDesc();
+// upgradeTableDesceExd();
+// upgradeCubeDesc();
+// upgradeProjectInstance();
+ upgradeJobInstance();
+
+ }
+
+ private List<String> listResourceStore(String pathRoot) {
+ List<String> paths = null;
+ try {
+ paths = store.collectResourceRecursively(pathRoot, MetadataConstances.FILE_SURFIX);
+ } catch (IOException e1) {
+ e1.printStackTrace();
+ errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT);
+ }
+
+ return paths;
+ }
+
+ private void upgradeCubeDesc() {
+ logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
+
+ List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
+ for (String path : paths) {
+
+ try {
+ CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
+ CubeDesc ndesc = upgrade.upgrade();
+ ndesc.setSignature(ndesc.calculateSignature());
+
+ getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
+ getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER);
+ updatedResources.add(ndesc.getResourcePath());
+ } catch (IOException e) {
+ e.printStackTrace();
+ errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage());
+ }
+ }
+
+ }
+
+ private void upgradeTableDesc() {
+ List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
+ for (String path : paths) {
+ TableDesc t;
+ try {
+ t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER);
+ t.init();
+
+ // if it only has 1 "." in the path, delete the old resource if it exists
+ if (path.substring(path.indexOf(".")).length() == MetadataConstances.FILE_SURFIX.length()) {
+ getStore().deleteResource(path);
+ // the new source will be new;
+ t.setLastModified(0);
+ getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER);
+ updatedResources.add(t.getResourcePath());
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage());
+ }
+
+ }
+
+ }
+
+ private void upgradeTableDesceExd() {
+
+ List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
+ for (String path : paths) {
+ Map<String, String> attrs = Maps.newHashMap();
+
+ InputStream is = null;
+ try {
+ is = store.getResource(path);
+ if (is == null) {
+ continue;
+ }
+ try {
+ attrs.putAll(JsonUtil.readValue(is, HashMap.class));
+ } finally {
+ if (is != null)
+ is.close();
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
+ }
+
+ // parse table identity from file name
+ String file = path;
+ if (file.indexOf("/") > -1) {
+ file = file.substring(file.lastIndexOf("/") + 1);
+ }
+ String tableIdentity = file.substring(0, file.length() - MetadataConstances.FILE_SURFIX.length()).toUpperCase();
+
+ // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json)
+ if (tableIdentity.indexOf(".") < 0) {
+ tableIdentity = appendDBName(tableIdentity);
+ try {
+ getMetadataManager().saveTableExd(tableIdentity, attrs);
+ //delete old resoruce if it exists;
+ getStore().deleteResource(path);
+ updatedResources.add(path);
+ } catch (IOException e) {
+ e.printStackTrace();
+ errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
+ }
+
+ }
+
+ }
+
+ }
+
+ public String appendDBName(String table) {
+
+ if (table.indexOf(".") > 0)
+ return table;
+
+ Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap();
+
+ int count = 0;
+ String result = null;
+ for (TableDesc t : map.values()) {
+ if (t.getName().equalsIgnoreCase(table)) {
+ result = t.getIdentity();
+ count++;
+ }
+ }
+
+ if (count == 1)
+ return result;
+
+ if (count > 1) {
+ errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; The program couldn't determine, randomly pick '" + result + "'");
+ }
+ return result;
+ }
+
+ private void upgradeProjectInstance() {
+ List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
+ for (String path : paths) {
+ try {
+ com.kylinolap.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, com.kylinolap.cube.model.v1.ProjectInstance.class, new JsonSerializer<com.kylinolap.cube.model.v1.ProjectInstance>(com.kylinolap.cube.model.v1.ProjectInstance.class));
+
+ ProjectInstance newPrj = new ProjectInstance();
+ newPrj.setUuid(oldPrj.getUuid());
+ newPrj.setName(oldPrj.getName());
+ newPrj.setOwner(oldPrj.getOwner());
+ newPrj.setDescription(oldPrj.getDescription());
+ newPrj.setLastModified(oldPrj.getLastModified());
+ newPrj.setLastUpdateTime(oldPrj.getLastUpdateTime());
+ newPrj.setCreateTime(oldPrj.getCreateTime());
+ newPrj.setStatus(oldPrj.getStatus());
+ List<RealizationEntry> realizationEntries = Lists.newArrayList();
+ for (String cube : oldPrj.getCubes()) {
+ RealizationEntry entry = new RealizationEntry();
+ entry.setType(RealizationType.CUBE);
+ entry.setRealization(cube);
+ realizationEntries.add(entry);
+ }
+ newPrj.setRealizationEntries(realizationEntries);
+ newPrj.getCreateTimeUTC();
+
+ Set<String> tables = Sets.newHashSet();
+ for (String table : oldPrj.getTables()) {
+ tables.add(this.appendDBName(table));
+ }
+ newPrj.setTables(tables);
+
+ store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER);
+ updatedResources.add(path);
+ } catch (IOException e) {
+ e.printStackTrace();
+ errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage());
+ }
+ }
+
+ }
+
+ private MetadataManager getMetadataManager() {
+ return MetadataManager.getInstance(config);
+ }
+
+ private ResourceStore getStore() {
+ return ResourceStore.getStore(config);
+ }
+
+ private ExecutableManager getExecutableManager() {
+ return ExecutableManager.getInstance(config);
+ }
+
+ private void upgradeJobInstance() {
+ try {
+ List<String> paths = getStore().collectResourceRecursively(ResourceStore.JOB_PATH_ROOT, "");
+ for (String path: paths) {
+ upgradeJobInstance(path);
+ }
+ } catch (IOException ex) {
+ errorMsgs.add("upgrade job failed" + ex.getLocalizedMessage());
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private ExecutableState parseState(JobStatusEnum state) {
+ switch (state) {
+ case NEW:
+ case PENDING:
+ return ExecutableState.READY;
+ case RUNNING:
+ return ExecutableState.RUNNING;
+ case FINISHED:
+ return ExecutableState.SUCCEED;
+ case ERROR:
+ return ExecutableState.ERROR;
+ case DISCARDED:
+ return ExecutableState.DISCARDED;
+ default:
+ return ExecutableState.DISCARDED;
+ }
+ }
+
+ private ExecutableState parseState(JobStepStatusEnum state) {
+ switch (state) {
+ case NEW:
+ case PENDING:
+ case WAITING:
+ return ExecutableState.READY;
+ case RUNNING:
+ return ExecutableState.RUNNING;
+ case FINISHED:
+ return ExecutableState.SUCCEED;
+ case ERROR:
+ return ExecutableState.ERROR;
+ case DISCARDED:
+ return ExecutableState.DISCARDED;
+ default:
+ return ExecutableState.DISCARDED;
+ }
+
+ }
+
+ private void upgradeJobInstance(String path) throws IOException {
+ JobInstance job = getStore().getResource(path, JobInstance.class, new JsonSerializer<JobInstance>(JobInstance.class));
+ CubingJob cubingJob = new CubingJob();
+ cubingJob.setId(job.getId());
+ cubingJob.setName(job.getName());
+ cubingJob.setSubmitter(job.getSubmitter());
+ for (JobInstance.JobStep step: job.getSteps()) {
+ final AbstractExecutable executable = parseToExecutable(step);
+ cubingJob.addTask(executable);
+ }
+ getExecutableManager().addJob(cubingJob);
+
+ cubingJob.setStartTime(job.getExecStartTime());
+ cubingJob.setEndTime(job.getExecEndTime());
+ cubingJob.setMapReduceWaitTime(job.getMrWaiting());
+ getExecutableManager().resetJobOutput(cubingJob.getId(), parseState(job.getStatus()), job.getStatus().toString());
+
+ for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
+ final JobInstance.JobStep jobStep = job.getSteps().get(i);
+ final InputStream inputStream = getStore().getResource(ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i);
+ String output = null;
+ if (inputStream != null) {
+ JsonElement json = new JsonParser().parse(new InputStreamReader(inputStream));
+ if (json instanceof JsonObject) {
+ final JsonElement element = ((JsonObject) json).get("output");
+ if (element != null) {
+ output = element.getAsString();
+ } else {
+ output = json.getAsString();
+ }
+ } else {
+ output = json.getAsString();
+ }
+ }
+ updateJobStepOutput(jobStep, output, cubingJob.getTasks().get(i));
+ }
+ }
+
+ private void updateJobStepOutput(JobInstance.JobStep step, String output, AbstractExecutable task) {
+ task.setStartTime(step.getExecStartTime());
+ task.setEndTime(step.getExecEndTime());
+ if (task instanceof MapReduceExecutable) {
+ ((MapReduceExecutable) task).setMapReduceWaitTime(step.getExecWaitTime() * 1000);
+ }
+ getExecutableManager().resetJobOutput(task.getId(), parseState(step.getStatus()), output);
+ }
+
+ private AbstractExecutable parseToExecutable(JobInstance.JobStep step) {
+ AbstractExecutable result;
+ switch (step.getCmdType()) {
+ case SHELL_CMD_HADOOP: {
+ ShellExecutable executable = new ShellExecutable();
+ executable.setCmd(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_FACTDISTINCT: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(FactDistinctColumnsJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_BASECUBOID: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(BaseCuboidJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_NDCUBOID: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(NDCuboidJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(RangeKeyDistributionJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_CONVERTHFILE: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(CubeHFileJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_MERGECUBOID: {
+ MapReduceExecutable executable = new MapReduceExecutable();
+ executable.setMapReduceJobClass(MergeCuboidJob.class);
+ executable.setMapReduceParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_NO_MR_DICTIONARY: {
+ HadoopShellExecutable executable = new HadoopShellExecutable();
+ executable.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+ executable.setJobClass(CreateDictionaryJob.class);
+ executable.setJobParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE: {
+ HadoopShellExecutable executable = new HadoopShellExecutable();
+ executable.setJobClass(CreateHTableJob.class);
+ executable.setJobParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ case JAVA_CMD_HADOOP_NO_MR_BULKLOAD: {
+ HadoopShellExecutable executable = new HadoopShellExecutable();
+ executable.setJobClass(BulkLoadJob.class);
+ executable.setJobParams(step.getExecCmd());
+ result = executable;
+ break;
+ }
+ default:
+ throw new RuntimeException("invalid step type:" + step.getCmdType());
+ }
+ result.setName(step.getName());
+ return result;
+ }
+
+ public static void main(String[] args) {
+
+ if (!(args != null && args.length == 1)) {
+ System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder>; e.g, /export/kylin/meta");
+ return;
+ }
+
+ String exportFolder = args[0];
+
+ File oldMetaFolder = new File(exportFolder);
+ if(!oldMetaFolder.exists()) {
+ System.out.println("Provided folder doesn't exist: '" + exportFolder + "'");
+ return;
+ }
+
+ if(!oldMetaFolder.isDirectory()) {
+ System.out.println("Provided folder is not a directory: '" + exportFolder + "'");
+ return;
+ }
+
+
+ String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
+ try {
+ FileUtils.deleteDirectory(new File(newMetadataUrl));
+ FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+
+ CubeMetadataUpgrade instance = new CubeMetadataUpgrade(newMetadataUrl);
+
+ instance.upgrade();
+
+ logger.info("Run CubeMetadataUpgrade completed, check the following messages.");
+ logger.info("The following resources have been successfully updated in : " + newMetadataUrl);
+ for (String s : instance.updatedResources) {
+ logger.info(s);
+ }
+
+ if (instance.errorMsgs.size() > 0) {
+ logger.info("Here are the error/warning messages, you may need check:");
+ for (String s : instance.errorMsgs) {
+ logger.warn(s);
+ }
+ } else {
+ logger.info("No error or warning messages; Looks all good.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java b/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
index fe38ca3..48d5c3c 100644
--- a/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
+++ b/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
@@ -115,6 +115,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
return job.getId();
}
+ public final void setId(String id) {
+ this.job.setUuid(id);
+ }
+
@Override
public final ExecutableState getStatus() {
return jobService.getOutput(this.getId()).getState();
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java b/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
index b7bb840..30683aa 100644
--- a/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
+++ b/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
@@ -104,6 +104,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
}
public void addTask(AbstractExecutable executable) {
+ executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
this.subTasks.add(executable);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java b/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
index 26563d4..4e51056 100644
--- a/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
+++ b/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -204,46 +205,20 @@ public class ExecutableManager {
}
}
-// public boolean updateJobStatus(String jobId, ExecutableState newStatus) {
-// try {
-// final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
-// ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-// if (oldStatus == newStatus) {
-// return true;
-// }
-// if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-// throw new RuntimeException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-// }
-// jobOutput.setStatus(newStatus.toString());
-// jobDao.updateJobOutput(jobOutput);
-// logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-// return true;
-// } catch (PersistentException e) {
-// logger.error("error change job:" + jobId + " to " + newStatus.toString());
-// throw new RuntimeException(e);
-// }
-// }
-//
-// public boolean updateJobStatus(String jobId, ExecutableState newStatus, String output) {
-// try {
-// final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
-// ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-// if (oldStatus == newStatus) {
-// return true;
-// }
-// if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-// throw new RuntimeException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-// }
-// jobOutput.setStatus(newStatus.toString());
-// jobOutput.setContent(output);
-// jobDao.updateJobOutput(jobOutput);
-// logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-// return true;
-// } catch (PersistentException e) {
-// logger.error("error change job:" + jobId + " to " + newStatus.toString());
-// throw new RuntimeException(e);
-// }
-// }
+ //for migration only
+ //TODO delete when migration finished
+ public void resetJobOutput(String jobId, ExecutableState state, String output) {
+ try {
+ final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
+ jobOutput.setStatus(state.toString());
+ if (output != null) {
+ jobOutput.setContent(output);
+ }
+ jobDao.updateJobOutput(jobOutput);
+ } catch (PersistentException e) {
+ throw new RuntimeException(e);
+ }
+ }
public void addJobInfo(String id, Map<String, String> info) {
if (info == null) {