You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:07 UTC

[20/50] [abbrv] incubator-kylin git commit: migration tool for job engine

migration tool for job engine


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/504ebb5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/504ebb5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/504ebb5e

Branch: refs/heads/inverted-index
Commit: 504ebb5ed47561a571fadad407af9910b81d8141
Parents: 56572c5
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jan 22 17:35:49 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jan 22 17:35:49 2015 +0800

----------------------------------------------------------------------
 .../cube/model/CubeMetadataUpgrade.java         | 297 -----------
 .../com/kylinolap/job/CubeMetadataUpgrade.java  | 498 +++++++++++++++++++
 .../job/impl/threadpool/AbstractExecutable.java |   4 +
 .../threadpool/DefaultChainedExecutable.java    |   1 +
 .../job/service/ExecutableManager.java          |  55 +-
 5 files changed, 518 insertions(+), 337 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java b/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
deleted file mode 100644
index bf77a08..0000000
--- a/cube/src/main/java/com/kylinolap/cube/model/CubeMetadataUpgrade.java
+++ /dev/null
@@ -1,297 +0,0 @@
-package com.kylinolap.cube.model;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.JsonSerializer;
-import com.kylinolap.common.persistence.ResourceStore;
-import com.kylinolap.common.util.JsonUtil;
-import com.kylinolap.cube.CubeDescManager;
-import com.kylinolap.cube.CubeDescUpgrader;
-import com.kylinolap.metadata.MetadataConstances;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.TableDesc;
-import com.kylinolap.metadata.project.ProjectInstance;
-import com.kylinolap.metadata.project.ProjectManager;
-import com.kylinolap.metadata.project.RealizationEntry;
-import com.kylinolap.metadata.realization.RealizationType;
-
-/**
- * This is the utility class to migrate the Kylin metadata format from v1 to v2;
- * 
- * @author shaoshi
- *
- */
-public class CubeMetadataUpgrade {
-
-    private KylinConfig config = null;
-    private ResourceStore store;
-
-    private List<String> updatedResources = Lists.newArrayList();
-    private List<String> errorMsgs = Lists.newArrayList();
-
-    private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class);
-
-    public CubeMetadataUpgrade(String newMetadataUrl) {
-        KylinConfig.destoryInstance();
-        System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
-        KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
-
-        
-        config = KylinConfig.getInstanceFromEnv();
-        store = getStore();
-    }
-    
-    public void upgrade() {
-
-        upgradeTableDesc();
-        upgradeTableDesceExd();
-        upgradeCubeDesc();
-        upgradeProjectInstance();
-        
-    }
-
-    private List<String> listResourceStore(String pathRoot) {
-        List<String> paths = null;
-        try {
-            paths = store.collectResourceRecursively(pathRoot, MetadataConstances.FILE_SURFIX);
-        } catch (IOException e1) {
-            e1.printStackTrace();
-            errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT);
-        }
-
-        return paths;
-    }
-
-    private void upgradeCubeDesc() {
-        logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
-
-        List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
-        for (String path : paths) {
-
-            try {
-                CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
-                CubeDesc ndesc = upgrade.upgrade();
-                ndesc.setSignature(ndesc.calculateSignature());
-                
-                getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
-                getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER);
-                updatedResources.add(ndesc.getResourcePath());
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage());
-            }
-        }
-
-    }
-
-    private void upgradeTableDesc() {
-        List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
-        for (String path : paths) {
-            TableDesc t;
-            try {
-                t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER);
-                t.init();
-
-                // if it only has 1 "." in the path, delete the old resource if it exists
-                if (path.substring(path.indexOf(".")).length() == MetadataConstances.FILE_SURFIX.length()) {
-                    getStore().deleteResource(path);
-                    // the new source will be new;
-                    t.setLastModified(0);
-                    getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER);
-                    updatedResources.add(t.getResourcePath());
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage());
-            }
-
-        }
-
-    }
-
-    private void upgradeTableDesceExd() {
-
-        List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
-        for (String path : paths) {
-            Map<String, String> attrs = Maps.newHashMap();
-
-            InputStream is = null;
-            try {
-                is = store.getResource(path);
-                if (is == null) {
-                    continue;
-                }
-                try {
-                    attrs.putAll(JsonUtil.readValue(is, HashMap.class));
-                } finally {
-                    if (is != null)
-                        is.close();
-                }
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
-            }
-
-            // parse table identity from file name
-            String file = path;
-            if (file.indexOf("/") > -1) {
-                file = file.substring(file.lastIndexOf("/") + 1);
-            }
-            String tableIdentity = file.substring(0, file.length() - MetadataConstances.FILE_SURFIX.length()).toUpperCase();
-
-            // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json)
-            if (tableIdentity.indexOf(".") < 0) {
-                tableIdentity = appendDBName(tableIdentity);
-                try {
-                    getMetadataManager().saveTableExd(tableIdentity, attrs);
-                    //delete old resoruce if it exists;
-                    getStore().deleteResource(path);
-                    updatedResources.add(path);
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
-                }
-
-            }
-
-        }
-
-    }
-
-    public String appendDBName(String table) {
-
-        if (table.indexOf(".") > 0)
-            return table;
-
-        Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap();
-
-        int count = 0;
-        String result = null;
-        for (TableDesc t : map.values()) {
-            if (t.getName().equalsIgnoreCase(table)) {
-                result = t.getIdentity();
-                count++;
-            }
-        }
-
-        if (count == 1)
-            return result;
-
-        if (count > 1) {
-            errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; The program couldn't determine, randomly pick '" + result + "'");
-        }
-        return result;
-    }
-
-    private void upgradeProjectInstance() {
-        List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
-        for (String path : paths) {
-            try {
-                com.kylinolap.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, com.kylinolap.cube.model.v1.ProjectInstance.class, new JsonSerializer<com.kylinolap.cube.model.v1.ProjectInstance>(com.kylinolap.cube.model.v1.ProjectInstance.class));
-
-                ProjectInstance newPrj = new ProjectInstance();
-                newPrj.setUuid(oldPrj.getUuid());
-                newPrj.setName(oldPrj.getName());
-                newPrj.setOwner(oldPrj.getOwner());
-                newPrj.setDescription(oldPrj.getDescription());
-                newPrj.setLastModified(oldPrj.getLastModified());
-                newPrj.setLastUpdateTime(oldPrj.getLastUpdateTime());
-                newPrj.setCreateTime(oldPrj.getCreateTime());
-                newPrj.setStatus(oldPrj.getStatus());
-                List<RealizationEntry> realizationEntries = Lists.newArrayList();
-                for (String cube : oldPrj.getCubes()) {
-                    RealizationEntry entry = new RealizationEntry();
-                    entry.setType(RealizationType.CUBE);
-                    entry.setRealization(cube);
-                    realizationEntries.add(entry);
-                }
-                newPrj.setRealizationEntries(realizationEntries);
-                newPrj.getCreateTimeUTC();
-
-                Set<String> tables = Sets.newHashSet();
-                for (String table : oldPrj.getTables()) {
-                    tables.add(this.appendDBName(table));
-                }
-                newPrj.setTables(tables);
-
-                store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER);
-                updatedResources.add(path);
-            } catch (IOException e) {
-                e.printStackTrace();
-                errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage());
-            }
-        }
-
-    }
-
-    private MetadataManager getMetadataManager() {
-        return MetadataManager.getInstance(config);
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(config);
-    }
-
-    public static void main(String[] args) {
-
-        if (!(args != null && args.length == 1)) {
-            System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder>; e.g, /export/kylin/meta");
-            return;
-        }
-
-        String exportFolder = args[0];
-        
-        File oldMetaFolder = new File(exportFolder);
-        if(!oldMetaFolder.exists()) {
-            System.out.println("Provided folder doesn't exist: '" + exportFolder + "'");
-            return;
-        }
-        
-        if(!oldMetaFolder.isDirectory()) {
-            System.out.println("Provided folder is not a directory: '" + exportFolder + "'");
-            return;
-        }
-
-        
-        String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
-        try {
-            FileUtils.deleteDirectory(new File(newMetadataUrl));
-            FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl));
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-
-        CubeMetadataUpgrade instance = new CubeMetadataUpgrade(newMetadataUrl);
-
-        instance.upgrade();
-        
-        logger.info("Run CubeMetadataUpgrade completed, check the following messages.");
-        logger.info("The following resources have been successfully updated in : " + newMetadataUrl);
-        for (String s : instance.updatedResources) {
-            logger.info(s);
-        }
-
-        if (instance.errorMsgs.size() > 0) {
-            logger.info("Here are the error/warning messages, you may need check:");
-            for (String s : instance.errorMsgs) {
-                logger.warn(s);
-            }
-        } else {
-            logger.info("No error or warning messages; Looks all good.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java b/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
new file mode 100644
index 0000000..1a71df0
--- /dev/null
+++ b/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
@@ -0,0 +1,498 @@
+package com.kylinolap.job;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import com.kylinolap.cube.model.CubeDesc;
+import com.kylinolap.job.common.HadoopShellExecutable;
+import com.kylinolap.job.common.MapReduceExecutable;
+import com.kylinolap.job.common.ShellExecutable;
+import com.kylinolap.job.constant.ExecutableConstants;
+import com.kylinolap.job.constant.JobStatusEnum;
+import com.kylinolap.job.constant.JobStepStatusEnum;
+import com.kylinolap.job.cube.CubingJob;
+import com.kylinolap.job.execution.ExecutableState;
+import com.kylinolap.job.hadoop.cube.*;
+import com.kylinolap.job.hadoop.dict.CreateDictionaryJob;
+import com.kylinolap.job.hadoop.hbase.BulkLoadJob;
+import com.kylinolap.job.hadoop.hbase.CreateHTableJob;
+import com.kylinolap.job.impl.threadpool.AbstractExecutable;
+import com.kylinolap.job.service.ExecutableManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.kylinolap.common.KylinConfig;
+import com.kylinolap.common.persistence.JsonSerializer;
+import com.kylinolap.common.persistence.ResourceStore;
+import com.kylinolap.common.util.JsonUtil;
+import com.kylinolap.cube.CubeDescManager;
+import com.kylinolap.cube.CubeDescUpgrader;
+import com.kylinolap.metadata.MetadataConstances;
+import com.kylinolap.metadata.MetadataManager;
+import com.kylinolap.metadata.model.TableDesc;
+import com.kylinolap.metadata.project.ProjectInstance;
+import com.kylinolap.metadata.project.ProjectManager;
+import com.kylinolap.metadata.project.RealizationEntry;
+import com.kylinolap.metadata.realization.RealizationType;
+
+/**
+ * This is the utility class to migrate the Kylin metadata format from v1 to v2;
+ * 
+ * @author shaoshi
+ *
+ */
+public class CubeMetadataUpgrade {
+
+    private KylinConfig config = null;
+    private ResourceStore store;
+
+    private List<String> updatedResources = Lists.newArrayList();
+    private List<String> errorMsgs = Lists.newArrayList();
+
+    private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class);
+
+    public CubeMetadataUpgrade(String newMetadataUrl) {
+        KylinConfig.destoryInstance();
+        System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
+        KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
+
+        
+        config = KylinConfig.getInstanceFromEnv();
+        store = getStore();
+    }
+    
+    public void upgrade() {
+
+//        upgradeTableDesc();
+//        upgradeTableDesceExd();
+//        upgradeCubeDesc();
+//        upgradeProjectInstance();
+        upgradeJobInstance();
+        
+    }
+
+    private List<String> listResourceStore(String pathRoot) {
+        List<String> paths = null;
+        try {
+            paths = store.collectResourceRecursively(pathRoot, MetadataConstances.FILE_SURFIX);
+        } catch (IOException e1) {
+            e1.printStackTrace();
+            errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT);
+        }
+
+        return paths;
+    }
+
+    private void upgradeCubeDesc() {
+        logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
+
+        List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
+        for (String path : paths) {
+
+            try {
+                CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
+                CubeDesc ndesc = upgrade.upgrade();
+                ndesc.setSignature(ndesc.calculateSignature());
+                
+                getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
+                getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER);
+                updatedResources.add(ndesc.getResourcePath());
+            } catch (IOException e) {
+                e.printStackTrace();
+                errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage());
+            }
+        }
+
+    }
+
+    private void upgradeTableDesc() {
+        List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
+        for (String path : paths) {
+            TableDesc t;
+            try {
+                t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER);
+                t.init();
+
+                // if it only has 1 "." in the path, delete the old resource if it exists
+                if (path.substring(path.indexOf(".")).length() == MetadataConstances.FILE_SURFIX.length()) {
+                    getStore().deleteResource(path);
+                    // the new source will be new;
+                    t.setLastModified(0);
+                    getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER);
+                    updatedResources.add(t.getResourcePath());
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+                errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage());
+            }
+
+        }
+
+    }
+
+    private void upgradeTableDesceExd() {
+
+        List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
+        for (String path : paths) {
+            Map<String, String> attrs = Maps.newHashMap();
+
+            InputStream is = null;
+            try {
+                is = store.getResource(path);
+                if (is == null) {
+                    continue;
+                }
+                try {
+                    attrs.putAll(JsonUtil.readValue(is, HashMap.class));
+                } finally {
+                    if (is != null)
+                        is.close();
+                }
+            } catch (IOException e) {
+                e.printStackTrace();
+                errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
+            }
+
+            // parse table identity from file name
+            String file = path;
+            if (file.indexOf("/") > -1) {
+                file = file.substring(file.lastIndexOf("/") + 1);
+            }
+            String tableIdentity = file.substring(0, file.length() - MetadataConstances.FILE_SURFIX.length()).toUpperCase();
+
+            // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json)
+            if (tableIdentity.indexOf(".") < 0) {
+                tableIdentity = appendDBName(tableIdentity);
+                try {
+                    getMetadataManager().saveTableExd(tableIdentity, attrs);
+                    //delete old resoruce if it exists;
+                    getStore().deleteResource(path);
+                    updatedResources.add(path);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
+                }
+
+            }
+
+        }
+
+    }
+
+    public String appendDBName(String table) {
+
+        if (table.indexOf(".") > 0)
+            return table;
+
+        Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap();
+
+        int count = 0;
+        String result = null;
+        for (TableDesc t : map.values()) {
+            if (t.getName().equalsIgnoreCase(table)) {
+                result = t.getIdentity();
+                count++;
+            }
+        }
+
+        if (count == 1)
+            return result;
+
+        if (count > 1) {
+            errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; The program couldn't determine, randomly pick '" + result + "'");
+        }
+        return result;
+    }
+
+    private void upgradeProjectInstance() {
+        List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
+        for (String path : paths) {
+            try {
+                com.kylinolap.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, com.kylinolap.cube.model.v1.ProjectInstance.class, new JsonSerializer<com.kylinolap.cube.model.v1.ProjectInstance>(com.kylinolap.cube.model.v1.ProjectInstance.class));
+
+                ProjectInstance newPrj = new ProjectInstance();
+                newPrj.setUuid(oldPrj.getUuid());
+                newPrj.setName(oldPrj.getName());
+                newPrj.setOwner(oldPrj.getOwner());
+                newPrj.setDescription(oldPrj.getDescription());
+                newPrj.setLastModified(oldPrj.getLastModified());
+                newPrj.setLastUpdateTime(oldPrj.getLastUpdateTime());
+                newPrj.setCreateTime(oldPrj.getCreateTime());
+                newPrj.setStatus(oldPrj.getStatus());
+                List<RealizationEntry> realizationEntries = Lists.newArrayList();
+                for (String cube : oldPrj.getCubes()) {
+                    RealizationEntry entry = new RealizationEntry();
+                    entry.setType(RealizationType.CUBE);
+                    entry.setRealization(cube);
+                    realizationEntries.add(entry);
+                }
+                newPrj.setRealizationEntries(realizationEntries);
+                newPrj.getCreateTimeUTC();
+
+                Set<String> tables = Sets.newHashSet();
+                for (String table : oldPrj.getTables()) {
+                    tables.add(this.appendDBName(table));
+                }
+                newPrj.setTables(tables);
+
+                store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER);
+                updatedResources.add(path);
+            } catch (IOException e) {
+                e.printStackTrace();
+                errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage());
+            }
+        }
+
+    }
+
+    private MetadataManager getMetadataManager() {
+        return MetadataManager.getInstance(config);
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(config);
+    }
+
+    private ExecutableManager getExecutableManager() {
+        return ExecutableManager.getInstance(config);
+    }
+
+    private void upgradeJobInstance() {
+        try {
+            List<String> paths = getStore().collectResourceRecursively(ResourceStore.JOB_PATH_ROOT, "");
+            for (String path: paths) {
+                upgradeJobInstance(path);
+            }
+        } catch (IOException ex) {
+            errorMsgs.add("upgrade job failed" + ex.getLocalizedMessage());
+            throw new RuntimeException(ex);
+        }
+    }
+
+    private ExecutableState parseState(JobStatusEnum state) {
+        switch (state) {
+            case NEW:
+            case PENDING:
+                return ExecutableState.READY;
+            case RUNNING:
+                return ExecutableState.RUNNING;
+            case FINISHED:
+                return ExecutableState.SUCCEED;
+            case ERROR:
+                return ExecutableState.ERROR;
+            case DISCARDED:
+                return ExecutableState.DISCARDED;
+            default:
+                return ExecutableState.DISCARDED;
+        }
+    }
+
+    private ExecutableState parseState(JobStepStatusEnum state) {
+        switch (state) {
+            case NEW:
+            case PENDING:
+            case WAITING:
+                return ExecutableState.READY;
+            case RUNNING:
+                return ExecutableState.RUNNING;
+            case FINISHED:
+                return ExecutableState.SUCCEED;
+            case ERROR:
+                return ExecutableState.ERROR;
+            case DISCARDED:
+                return ExecutableState.DISCARDED;
+            default:
+                return ExecutableState.DISCARDED;
+        }
+
+    }
+
+    private void upgradeJobInstance(String path) throws IOException {
+        JobInstance job = getStore().getResource(path, JobInstance.class, new JsonSerializer<JobInstance>(JobInstance.class));
+        CubingJob cubingJob = new CubingJob();
+        cubingJob.setId(job.getId());
+        cubingJob.setName(job.getName());
+        cubingJob.setSubmitter(job.getSubmitter());
+        for (JobInstance.JobStep step: job.getSteps()) {
+            final AbstractExecutable executable = parseToExecutable(step);
+            cubingJob.addTask(executable);
+        }
+        getExecutableManager().addJob(cubingJob);
+
+        cubingJob.setStartTime(job.getExecStartTime());
+        cubingJob.setEndTime(job.getExecEndTime());
+        cubingJob.setMapReduceWaitTime(job.getMrWaiting());
+        getExecutableManager().resetJobOutput(cubingJob.getId(), parseState(job.getStatus()), job.getStatus().toString());
+
+        for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
+            final JobInstance.JobStep jobStep = job.getSteps().get(i);
+            final InputStream inputStream = getStore().getResource(ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i);
+            String output = null;
+            if (inputStream != null) {
+                JsonElement json = new JsonParser().parse(new InputStreamReader(inputStream));
+                if (json instanceof JsonObject) {
+                    final JsonElement element = ((JsonObject) json).get("output");
+                    if (element != null) {
+                        output = element.getAsString();
+                    } else {
+                        output = json.getAsString();
+                    }
+                } else {
+                    output = json.getAsString();
+                }
+            }
+            updateJobStepOutput(jobStep, output, cubingJob.getTasks().get(i));
+        }
+    }
+
+    private void updateJobStepOutput(JobInstance.JobStep step, String output, AbstractExecutable task) {
+        task.setStartTime(step.getExecStartTime());
+        task.setEndTime(step.getExecEndTime());
+        if (task instanceof MapReduceExecutable) {
+            ((MapReduceExecutable) task).setMapReduceWaitTime(step.getExecWaitTime() * 1000);
+        }
+        getExecutableManager().resetJobOutput(task.getId(), parseState(step.getStatus()), output);
+    }
+
+    private AbstractExecutable parseToExecutable(JobInstance.JobStep step) {
+        AbstractExecutable result;
+        switch (step.getCmdType()) {
+            case SHELL_CMD_HADOOP: {
+                ShellExecutable executable = new ShellExecutable();
+                executable.setCmd(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_FACTDISTINCT: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(FactDistinctColumnsJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_BASECUBOID: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(BaseCuboidJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_NDCUBOID: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(NDCuboidJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(RangeKeyDistributionJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_CONVERTHFILE: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(CubeHFileJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_MERGECUBOID: {
+                MapReduceExecutable executable = new MapReduceExecutable();
+                executable.setMapReduceJobClass(MergeCuboidJob.class);
+                executable.setMapReduceParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_NO_MR_DICTIONARY: {
+                HadoopShellExecutable executable = new HadoopShellExecutable();
+                executable.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
+                executable.setJobClass(CreateDictionaryJob.class);
+                executable.setJobParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE: {
+                HadoopShellExecutable executable = new HadoopShellExecutable();
+                executable.setJobClass(CreateHTableJob.class);
+                executable.setJobParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            case JAVA_CMD_HADOOP_NO_MR_BULKLOAD: {
+                HadoopShellExecutable executable = new HadoopShellExecutable();
+                executable.setJobClass(BulkLoadJob.class);
+                executable.setJobParams(step.getExecCmd());
+                result = executable;
+                break;
+            }
+            default:
+                throw new RuntimeException("invalid step type:" + step.getCmdType());
+        }
+        result.setName(step.getName());
+        return result;
+    }
+
+    public static void main(String[] args) {
+
+        if (!(args != null && args.length == 1)) {
+            System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder>; e.g, /export/kylin/meta");
+            return;
+        }
+
+        String exportFolder = args[0];
+        
+        File oldMetaFolder = new File(exportFolder);
+        if(!oldMetaFolder.exists()) {
+            System.out.println("Provided folder doesn't exist: '" + exportFolder + "'");
+            return;
+        }
+        
+        if(!oldMetaFolder.isDirectory()) {
+            System.out.println("Provided folder is not a directory: '" + exportFolder + "'");
+            return;
+        }
+
+        
+        String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
+        try {
+            FileUtils.deleteDirectory(new File(newMetadataUrl));
+            FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl));
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        CubeMetadataUpgrade instance = new CubeMetadataUpgrade(newMetadataUrl);
+
+        instance.upgrade();
+        
+        logger.info("Run CubeMetadataUpgrade completed, check the following messages.");
+        logger.info("The following resources have been successfully updated in : " + newMetadataUrl);
+        for (String s : instance.updatedResources) {
+            logger.info(s);
+        }
+
+        if (instance.errorMsgs.size() > 0) {
+            logger.info("Here are the error/warning messages, you may need check:");
+            for (String s : instance.errorMsgs) {
+                logger.warn(s);
+            }
+        } else {
+            logger.info("No error or warning messages; Looks all good.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java b/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
index fe38ca3..48d5c3c 100644
--- a/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
+++ b/job/src/main/java/com/kylinolap/job/impl/threadpool/AbstractExecutable.java
@@ -115,6 +115,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         return job.getId();
     }
 
+    public final void setId(String id) {
+        this.job.setUuid(id);
+    }
+
     @Override
     public final ExecutableState getStatus() {
         return jobService.getOutput(this.getId()).getState();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java b/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
index b7bb840..30683aa 100644
--- a/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
+++ b/job/src/main/java/com/kylinolap/job/impl/threadpool/DefaultChainedExecutable.java
@@ -104,6 +104,7 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
     }
 
     public void addTask(AbstractExecutable executable) {
+        executable.setId(getId() + "-" + String.format("%02d", subTasks.size()));
         this.subTasks.add(executable);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/504ebb5e/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java b/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
index 26563d4..4e51056 100644
--- a/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
+++ b/job/src/main/java/com/kylinolap/job/service/ExecutableManager.java
@@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import java.lang.reflect.Constructor;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -204,46 +205,20 @@ public class ExecutableManager {
         }
     }
 
-//    public boolean updateJobStatus(String jobId, ExecutableState newStatus) {
-//        try {
-//            final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
-//            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-//            if (oldStatus == newStatus) {
-//                return true;
-//            }
-//            if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-//                throw new RuntimeException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-//            }
-//            jobOutput.setStatus(newStatus.toString());
-//            jobDao.updateJobOutput(jobOutput);
-//            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-//            return true;
-//        } catch (PersistentException e) {
-//            logger.error("error change job:" + jobId + " to " + newStatus.toString());
-//            throw new RuntimeException(e);
-//        }
-//    }
-//
-//    public boolean updateJobStatus(String jobId, ExecutableState newStatus, String output) {
-//        try {
-//            final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
-//            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-//            if (oldStatus == newStatus) {
-//                return true;
-//            }
-//            if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-//                throw new RuntimeException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-//            }
-//            jobOutput.setStatus(newStatus.toString());
-//            jobOutput.setContent(output);
-//            jobDao.updateJobOutput(jobOutput);
-//            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-//            return true;
-//        } catch (PersistentException e) {
-//            logger.error("error change job:" + jobId + " to " + newStatus.toString());
-//            throw new RuntimeException(e);
-//        }
-//    }
+    //for migration only
+    //TODO delete when migration finished
+    public void resetJobOutput(String jobId, ExecutableState state, String output) {
+        try {
+            final JobOutputPO jobOutput = jobDao.getJobOutput(jobId);
+            jobOutput.setStatus(state.toString());
+            if (output != null) {
+                jobOutput.setContent(output);
+            }
+            jobDao.updateJobOutput(jobOutput);
+        } catch (PersistentException e) {
+            throw new RuntimeException(e);
+        }
+    }
 
     public void addJobInfo(String id, Map<String, String> info) {
         if (info == null) {