You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/01/23 09:32:08 UTC

[21/50] [abbrv] incubator-kylin git commit: Merge branch 'inverted-index' of https://github.com/KylinOLAP/Kylin into inverted-index

Merge branch 'inverted-index' of https://github.com/KylinOLAP/Kylin into inverted-index


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0c5234fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0c5234fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0c5234fe

Branch: refs/heads/inverted-index
Commit: 0c5234fe380777db76e7bd1da9e7b9eb9ee17c96
Parents: 504ebb5 6faee1d
Author: qianhao.zhou <qi...@ebay.com>
Authored: Thu Jan 22 17:36:20 2015 +0800
Committer: qianhao.zhou <qi...@ebay.com>
Committed: Thu Jan 22 17:36:20 2015 +0800

----------------------------------------------------------------------
 .../com/kylinolap/cube/CubeDescUpgrader.java    | 17 ++++-
 .../com/kylinolap/cube/model/RowKeyColDesc.java |  4 ++
 .../kylinolap/metadata/MetadataUpgradeTest.java |  2 +
 ...t_kylin_cube_without_slr_left_join_desc.json |  6 +-
 .../com/kylinolap/job/CubeMetadataUpgrade.java  | 16 +++--
 .../job/constant/ExecutableConstants.java       |  1 +
 .../kylinolap/job/cube/CubingJobBuilder.java    |  6 +-
 .../kylinolap/job/cube/MergeDictionaryStep.java |  3 -
 .../job/hadoop/cube/MergeCuboidMapper.java      |  9 ++-
 .../cube/RangeKeyDistributionReducer.java       | 68 +++++++++++---------
 .../job/hadoop/hbase/CreateHTableJob.java       | 26 +++++---
 .../kylinolap/job/BuildCubeWithEngineTest.java  |  2 +
 .../kylinolap/job/BuildIIWithEngineTest.java    |  1 +
 .../job/hadoop/cube/BaseCuboidMapperTest.java   |  9 ++-
 .../job/hadoop/cube/CubeHFileMapper2Test.java   |  6 +-
 .../job/hadoop/cube/CubeReducerTest.java        |  4 +-
 .../job/hadoop/cube/MergeCuboidMapperTest.java  | 18 +++---
 .../job/hadoop/cube/NDCuboidMapperTest.java     |  2 +-
 .../cube/RangeKeyDistributionReducerTest.java   | 60 +----------------
 19 files changed, 123 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0c5234fe/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --cc job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
index 1a71df0,0000000..9779058
mode 100644,000000..100644
--- a/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/com/kylinolap/job/CubeMetadataUpgrade.java
@@@ -1,498 -1,0 +1,506 @@@
 +package com.kylinolap.job;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.io.InputStream;
 +import java.io.InputStreamReader;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import com.google.gson.JsonElement;
 +import com.google.gson.JsonObject;
 +import com.google.gson.JsonParser;
 +import com.kylinolap.cube.model.CubeDesc;
 +import com.kylinolap.job.common.HadoopShellExecutable;
 +import com.kylinolap.job.common.MapReduceExecutable;
 +import com.kylinolap.job.common.ShellExecutable;
 +import com.kylinolap.job.constant.ExecutableConstants;
 +import com.kylinolap.job.constant.JobStatusEnum;
 +import com.kylinolap.job.constant.JobStepStatusEnum;
 +import com.kylinolap.job.cube.CubingJob;
 +import com.kylinolap.job.execution.ExecutableState;
 +import com.kylinolap.job.hadoop.cube.*;
 +import com.kylinolap.job.hadoop.dict.CreateDictionaryJob;
 +import com.kylinolap.job.hadoop.hbase.BulkLoadJob;
 +import com.kylinolap.job.hadoop.hbase.CreateHTableJob;
 +import com.kylinolap.job.impl.threadpool.AbstractExecutable;
 +import com.kylinolap.job.service.ExecutableManager;
 +import org.apache.commons.io.FileUtils;
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +
 +import com.google.common.collect.Lists;
 +import com.google.common.collect.Maps;
 +import com.google.common.collect.Sets;
 +import com.kylinolap.common.KylinConfig;
 +import com.kylinolap.common.persistence.JsonSerializer;
 +import com.kylinolap.common.persistence.ResourceStore;
 +import com.kylinolap.common.util.JsonUtil;
 +import com.kylinolap.cube.CubeDescManager;
 +import com.kylinolap.cube.CubeDescUpgrader;
 +import com.kylinolap.metadata.MetadataConstances;
 +import com.kylinolap.metadata.MetadataManager;
 +import com.kylinolap.metadata.model.TableDesc;
 +import com.kylinolap.metadata.project.ProjectInstance;
 +import com.kylinolap.metadata.project.ProjectManager;
 +import com.kylinolap.metadata.project.RealizationEntry;
 +import com.kylinolap.metadata.realization.RealizationType;
 +
 +/**
 + * This is the utility class to migrate the Kylin metadata format from v1 to v2;
 + * 
 + * @author shaoshi
 + *
 + */
 +public class CubeMetadataUpgrade {
 +
 +    private KylinConfig config = null;
 +    private ResourceStore store;
 +
 +    private List<String> updatedResources = Lists.newArrayList();
 +    private List<String> errorMsgs = Lists.newArrayList();
 +
 +    private static final Log logger = LogFactory.getLog(CubeMetadataUpgrade.class);
 +
 +    public CubeMetadataUpgrade(String newMetadataUrl) {
 +        KylinConfig.destoryInstance();
 +        System.setProperty(KylinConfig.KYLIN_CONF, newMetadataUrl);
 +        KylinConfig.getInstanceFromEnv().setMetadataUrl(newMetadataUrl);
 +
 +        
 +        config = KylinConfig.getInstanceFromEnv();
 +        store = getStore();
 +    }
 +    
 +    public void upgrade() {
 +
 +//        upgradeTableDesc();
 +//        upgradeTableDesceExd();
 +//        upgradeCubeDesc();
 +//        upgradeProjectInstance();
 +        upgradeJobInstance();
 +        
++        verify();
++        
++    }
++    
++    private void verify() {
++        MetadataManager.getInstance(config).reload();
++        CubeDescManager.clearCache();
++        CubeDescManager.getInstance(config);
 +    }
 +
 +    private List<String> listResourceStore(String pathRoot) {
 +        List<String> paths = null;
 +        try {
 +            paths = store.collectResourceRecursively(pathRoot, MetadataConstances.FILE_SURFIX);
 +        } catch (IOException e1) {
 +            e1.printStackTrace();
 +            errorMsgs.add("Get IOException when scan resource store at: " + ResourceStore.CUBE_DESC_RESOURCE_ROOT);
 +        }
 +
 +        return paths;
 +    }
 +
 +    private void upgradeCubeDesc() {
 +        logger.info("Reloading Cube Metadata from folder " + store.getReadableResourcePath(ResourceStore.CUBE_DESC_RESOURCE_ROOT));
 +
 +        List<String> paths = listResourceStore(ResourceStore.CUBE_DESC_RESOURCE_ROOT);
 +        for (String path : paths) {
 +
 +            try {
 +                CubeDescUpgrader upgrade = new CubeDescUpgrader(path);
 +                CubeDesc ndesc = upgrade.upgrade();
 +                ndesc.setSignature(ndesc.calculateSignature());
 +                
 +                getStore().putResource(ndesc.getModel().getResourcePath(), ndesc.getModel(), MetadataManager.MODELDESC_SERIALIZER);
 +                getStore().putResource(ndesc.getResourcePath(), ndesc, CubeDescManager.CUBE_DESC_SERIALIZER);
 +                updatedResources.add(ndesc.getResourcePath());
 +            } catch (IOException e) {
 +                e.printStackTrace();
 +                errorMsgs.add("Upgrade CubeDesc at '" + path + "' failed: " + e.getLocalizedMessage());
 +            }
 +        }
 +
 +    }
 +
 +    private void upgradeTableDesc() {
 +        List<String> paths = listResourceStore(ResourceStore.TABLE_RESOURCE_ROOT);
 +        for (String path : paths) {
 +            TableDesc t;
 +            try {
 +                t = store.getResource(path, TableDesc.class, MetadataManager.TABLE_SERIALIZER);
 +                t.init();
 +
 +                // if it only has 1 "." in the path, delete the old resource if it exists
 +                if (path.substring(path.indexOf(".")).length() == MetadataConstances.FILE_SURFIX.length()) {
 +                    getStore().deleteResource(path);
 +                    // the new source will be new;
 +                    t.setLastModified(0);
 +                    getStore().putResource(t.getResourcePath(), t, MetadataManager.TABLE_SERIALIZER);
 +                    updatedResources.add(t.getResourcePath());
 +                }
 +            } catch (IOException e) {
 +                e.printStackTrace();
 +                errorMsgs.add("Upgrade TableDesc at '" + path + "' failed: " + e.getLocalizedMessage());
 +            }
 +
 +        }
 +
 +    }
 +
 +    private void upgradeTableDesceExd() {
 +
 +        List<String> paths = listResourceStore(ResourceStore.TABLE_EXD_RESOURCE_ROOT);
 +        for (String path : paths) {
 +            Map<String, String> attrs = Maps.newHashMap();
 +
 +            InputStream is = null;
 +            try {
 +                is = store.getResource(path);
 +                if (is == null) {
 +                    continue;
 +                }
 +                try {
 +                    attrs.putAll(JsonUtil.readValue(is, HashMap.class));
 +                } finally {
 +                    if (is != null)
 +                        is.close();
 +                }
 +            } catch (IOException e) {
 +                e.printStackTrace();
 +                errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
 +            }
 +
 +            // parse table identity from file name
 +            String file = path;
 +            if (file.indexOf("/") > -1) {
 +                file = file.substring(file.lastIndexOf("/") + 1);
 +            }
 +            String tableIdentity = file.substring(0, file.length() - MetadataConstances.FILE_SURFIX.length()).toUpperCase();
 +
 +            // for metadata upgrade, convert resource path to new pattern (<DB>.<TABLE>.json)
 +            if (tableIdentity.indexOf(".") < 0) {
 +                tableIdentity = appendDBName(tableIdentity);
 +                try {
 +                    getMetadataManager().saveTableExd(tableIdentity, attrs);
 +                    //delete old resoruce if it exists;
 +                    getStore().deleteResource(path);
 +                    updatedResources.add(path);
 +                } catch (IOException e) {
 +                    e.printStackTrace();
 +                    errorMsgs.add("Upgrade TableDescExd at '" + path + "' failed: " + e.getLocalizedMessage());
 +                }
 +
 +            }
 +
 +        }
 +
 +    }
 +
 +    public String appendDBName(String table) {
 +
 +        if (table.indexOf(".") > 0)
 +            return table;
 +
 +        Map<String, TableDesc> map = this.getMetadataManager().getAllTablesMap();
 +
 +        int count = 0;
 +        String result = null;
 +        for (TableDesc t : map.values()) {
 +            if (t.getName().equalsIgnoreCase(table)) {
 +                result = t.getIdentity();
 +                count++;
 +            }
 +        }
 +
 +        if (count == 1)
 +            return result;
 +
 +        if (count > 1) {
 +            errorMsgs.add("There are more than 1 table named with '" + table + "' in different database; The program couldn't determine, randomly pick '" + result + "'");
 +        }
 +        return result;
 +    }
 +
 +    private void upgradeProjectInstance() {
 +        List<String> paths = listResourceStore(ResourceStore.PROJECT_RESOURCE_ROOT);
 +        for (String path : paths) {
 +            try {
 +                com.kylinolap.cube.model.v1.ProjectInstance oldPrj = store.getResource(path, com.kylinolap.cube.model.v1.ProjectInstance.class, new JsonSerializer<com.kylinolap.cube.model.v1.ProjectInstance>(com.kylinolap.cube.model.v1.ProjectInstance.class));
 +
 +                ProjectInstance newPrj = new ProjectInstance();
 +                newPrj.setUuid(oldPrj.getUuid());
 +                newPrj.setName(oldPrj.getName());
 +                newPrj.setOwner(oldPrj.getOwner());
 +                newPrj.setDescription(oldPrj.getDescription());
 +                newPrj.setLastModified(oldPrj.getLastModified());
 +                newPrj.setLastUpdateTime(oldPrj.getLastUpdateTime());
 +                newPrj.setCreateTime(oldPrj.getCreateTime());
 +                newPrj.setStatus(oldPrj.getStatus());
 +                List<RealizationEntry> realizationEntries = Lists.newArrayList();
 +                for (String cube : oldPrj.getCubes()) {
 +                    RealizationEntry entry = new RealizationEntry();
 +                    entry.setType(RealizationType.CUBE);
 +                    entry.setRealization(cube);
 +                    realizationEntries.add(entry);
 +                }
 +                newPrj.setRealizationEntries(realizationEntries);
 +                newPrj.getCreateTimeUTC();
 +
 +                Set<String> tables = Sets.newHashSet();
 +                for (String table : oldPrj.getTables()) {
 +                    tables.add(this.appendDBName(table));
 +                }
 +                newPrj.setTables(tables);
 +
 +                store.putResource(newPrj.getResourcePath(), newPrj, ProjectManager.PROJECT_SERIALIZER);
 +                updatedResources.add(path);
 +            } catch (IOException e) {
 +                e.printStackTrace();
 +                errorMsgs.add("Upgrade Project at '" + path + "' failed: " + e.getLocalizedMessage());
 +            }
 +        }
 +
 +    }
 +
 +    private MetadataManager getMetadataManager() {
 +        return MetadataManager.getInstance(config);
 +    }
 +
 +    private ResourceStore getStore() {
 +        return ResourceStore.getStore(config);
 +    }
 +
 +    private ExecutableManager getExecutableManager() {
 +        return ExecutableManager.getInstance(config);
 +    }
 +
 +    private void upgradeJobInstance() {
 +        try {
 +            List<String> paths = getStore().collectResourceRecursively(ResourceStore.JOB_PATH_ROOT, "");
 +            for (String path: paths) {
 +                upgradeJobInstance(path);
 +            }
 +        } catch (IOException ex) {
 +            errorMsgs.add("upgrade job failed" + ex.getLocalizedMessage());
 +            throw new RuntimeException(ex);
 +        }
 +    }
 +
 +    private ExecutableState parseState(JobStatusEnum state) {
 +        switch (state) {
 +            case NEW:
 +            case PENDING:
 +                return ExecutableState.READY;
 +            case RUNNING:
 +                return ExecutableState.RUNNING;
 +            case FINISHED:
 +                return ExecutableState.SUCCEED;
 +            case ERROR:
 +                return ExecutableState.ERROR;
 +            case DISCARDED:
 +                return ExecutableState.DISCARDED;
 +            default:
 +                return ExecutableState.DISCARDED;
 +        }
 +    }
 +
 +    private ExecutableState parseState(JobStepStatusEnum state) {
 +        switch (state) {
 +            case NEW:
 +            case PENDING:
 +            case WAITING:
 +                return ExecutableState.READY;
 +            case RUNNING:
 +                return ExecutableState.RUNNING;
 +            case FINISHED:
 +                return ExecutableState.SUCCEED;
 +            case ERROR:
 +                return ExecutableState.ERROR;
 +            case DISCARDED:
 +                return ExecutableState.DISCARDED;
 +            default:
 +                return ExecutableState.DISCARDED;
 +        }
 +
 +    }
 +
 +    private void upgradeJobInstance(String path) throws IOException {
 +        JobInstance job = getStore().getResource(path, JobInstance.class, new JsonSerializer<JobInstance>(JobInstance.class));
 +        CubingJob cubingJob = new CubingJob();
 +        cubingJob.setId(job.getId());
 +        cubingJob.setName(job.getName());
 +        cubingJob.setSubmitter(job.getSubmitter());
 +        for (JobInstance.JobStep step: job.getSteps()) {
 +            final AbstractExecutable executable = parseToExecutable(step);
 +            cubingJob.addTask(executable);
 +        }
 +        getExecutableManager().addJob(cubingJob);
 +
 +        cubingJob.setStartTime(job.getExecStartTime());
 +        cubingJob.setEndTime(job.getExecEndTime());
 +        cubingJob.setMapReduceWaitTime(job.getMrWaiting());
 +        getExecutableManager().resetJobOutput(cubingJob.getId(), parseState(job.getStatus()), job.getStatus().toString());
 +
 +        for (int i = 0, size = job.getSteps().size(); i < size; ++i) {
 +            final JobInstance.JobStep jobStep = job.getSteps().get(i);
 +            final InputStream inputStream = getStore().getResource(ResourceStore.JOB_OUTPUT_PATH_ROOT + "/" + job.getId() + "." + i);
 +            String output = null;
 +            if (inputStream != null) {
 +                JsonElement json = new JsonParser().parse(new InputStreamReader(inputStream));
 +                if (json instanceof JsonObject) {
 +                    final JsonElement element = ((JsonObject) json).get("output");
 +                    if (element != null) {
 +                        output = element.getAsString();
 +                    } else {
 +                        output = json.getAsString();
 +                    }
 +                } else {
 +                    output = json.getAsString();
 +                }
 +            }
 +            updateJobStepOutput(jobStep, output, cubingJob.getTasks().get(i));
 +        }
 +    }
 +
 +    private void updateJobStepOutput(JobInstance.JobStep step, String output, AbstractExecutable task) {
 +        task.setStartTime(step.getExecStartTime());
 +        task.setEndTime(step.getExecEndTime());
 +        if (task instanceof MapReduceExecutable) {
 +            ((MapReduceExecutable) task).setMapReduceWaitTime(step.getExecWaitTime() * 1000);
 +        }
 +        getExecutableManager().resetJobOutput(task.getId(), parseState(step.getStatus()), output);
 +    }
 +
 +    private AbstractExecutable parseToExecutable(JobInstance.JobStep step) {
 +        AbstractExecutable result;
 +        switch (step.getCmdType()) {
 +            case SHELL_CMD_HADOOP: {
 +                ShellExecutable executable = new ShellExecutable();
 +                executable.setCmd(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_FACTDISTINCT: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(FactDistinctColumnsJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_BASECUBOID: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(BaseCuboidJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_NDCUBOID: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(NDCuboidJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(RangeKeyDistributionJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_CONVERTHFILE: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(CubeHFileJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_MERGECUBOID: {
 +                MapReduceExecutable executable = new MapReduceExecutable();
 +                executable.setMapReduceJobClass(MergeCuboidJob.class);
 +                executable.setMapReduceParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_NO_MR_DICTIONARY: {
 +                HadoopShellExecutable executable = new HadoopShellExecutable();
 +                executable.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
 +                executable.setJobClass(CreateDictionaryJob.class);
 +                executable.setJobParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE: {
 +                HadoopShellExecutable executable = new HadoopShellExecutable();
 +                executable.setJobClass(CreateHTableJob.class);
 +                executable.setJobParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            case JAVA_CMD_HADOOP_NO_MR_BULKLOAD: {
 +                HadoopShellExecutable executable = new HadoopShellExecutable();
 +                executable.setJobClass(BulkLoadJob.class);
 +                executable.setJobParams(step.getExecCmd());
 +                result = executable;
 +                break;
 +            }
 +            default:
 +                throw new RuntimeException("invalid step type:" + step.getCmdType());
 +        }
 +        result.setName(step.getName());
 +        return result;
 +    }
 +
 +    public static void main(String[] args) {
 +
 +        if (!(args != null && args.length == 1)) {
 +            System.out.println("Usage: java CubeMetadataUpgrade <metadata_export_folder>; e.g, /export/kylin/meta");
 +            return;
 +        }
 +
 +        String exportFolder = args[0];
 +        
 +        File oldMetaFolder = new File(exportFolder);
 +        if(!oldMetaFolder.exists()) {
 +            System.out.println("Provided folder doesn't exist: '" + exportFolder + "'");
 +            return;
 +        }
 +        
 +        if(!oldMetaFolder.isDirectory()) {
 +            System.out.println("Provided folder is not a directory: '" + exportFolder + "'");
 +            return;
 +        }
 +
 +        
 +        String newMetadataUrl = oldMetaFolder.getAbsolutePath() + "_v2";
 +        try {
 +            FileUtils.deleteDirectory(new File(newMetadataUrl));
 +            FileUtils.copyDirectory(oldMetaFolder, new File(newMetadataUrl));
 +        } catch (IOException e) {
 +            e.printStackTrace();
 +        }
 +
 +        CubeMetadataUpgrade instance = new CubeMetadataUpgrade(newMetadataUrl);
 +
 +        instance.upgrade();
-         
-         logger.info("Run CubeMetadataUpgrade completed, check the following messages.");
-         logger.info("The following resources have been successfully updated in : " + newMetadataUrl);
++        logger.info("=================================================================");
++        logger.info("Run CubeMetadataUpgrade completed; The following resources have been successfully updated in : " + newMetadataUrl);
 +        for (String s : instance.updatedResources) {
 +            logger.info(s);
 +        }
 +
++        logger.info("=================================================================");
 +        if (instance.errorMsgs.size() > 0) {
 +            logger.info("Here are the error/warning messages, you may need check:");
 +            for (String s : instance.errorMsgs) {
 +                logger.warn(s);
 +            }
 +        } else {
-             logger.info("No error or warning messages; Looks all good.");
++            logger.info("No error or warning messages; The migration is success.");
 +        }
 +    }
 +}