You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/24 01:20:23 UTC
[15/28] incubator-kylin git commit: KYLIN-875 Split job module into
'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
deleted file mode 100644
index bc6ee1f..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IMROutput {
-
- /** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg);
-
- /**
- * Participate the batch cubing flow as the output side. Responsible for saving
- * the cuboid output to storage (Phase 3).
- *
- * - Phase 1: Create Flat Table
- * - Phase 2: Build Dictionary
- * - Phase 3: Build Cube
- * - Phase 4: Update Metadata & Cleanup
- */
- public interface IMRBatchCubingOutputSide {
-
- /**
- * Add step that saves cuboid output from HDFS to storage.
- *
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
- * dictionary encoding; Mx is measure value serialization form.
- */
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
- }
-
- /** Return a helper to participate in batch merge job flow. */
- public IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg);
-
- /**
- * Participate the batch cubing flow as the output side. Responsible for saving
- * the cuboid output to storage (Phase 2).
- *
- * - Phase 1: Merge Dictionary
- * - Phase 2: Merge Cube
- * - Phase 3: Update Metadata & Cleanup
- */
- public interface IMRBatchMergeOutputSide {
-
- /**
- * Add step that saves cuboid output from HDFS to storage.
- *
- * The cuboid output is a directory of sequence files, where key takes format "CUBOID,D1,D2,..,Dn",
- * value takes format "M1,M2,..,Mm". CUBOID is 8 bytes cuboid ID; Dx is dimension value with
- * dictionary encoding; Mx is measure value serialization form.
- */
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
deleted file mode 100644
index 974e2fc..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public interface IMROutput2 {
-
- /** Return a helper to participate in batch cubing job flow. */
- public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
-
- /**
- * Participate the batch cubing flow as the output side.
- *
- * - Phase 1: Create Flat Table
- * - Phase 2: Build Dictionary
- * - Phase 3: Build Cube (with StorageOutputFormat)
- * - Phase 4: Update Metadata & Cleanup
- */
- public interface IMRBatchCubingOutputSide2 {
-
- public IMRStorageOutputFormat getStorageOutputFormat();
-
- /** Add step that executes after build dictionary and before build cube. */
- public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
-
- /** Add step that executes after build cube. */
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
- }
-
- public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
- public interface IMRBatchMergeInputSide2 {
- public IMRStorageInputFormat getStorageInputFormat();
- }
-
- @SuppressWarnings("rawtypes")
- public interface IMRStorageInputFormat {
-
- public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException;
-
- public CubeSegment findSourceSegment(Mapper.Context context, CubeInstance cubeInstance) throws IOException;
-
- public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
- }
-
- /** Return a helper to participate in batch merge job flow. */
- public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
-
- /**
- * Participate the batch merge flow as the output side.
- *
- * - Phase 1: Merge Dictionary
- * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
- * - Phase 3: Update Metadata & Cleanup
- */
- public interface IMRBatchMergeOutputSide2 {
-
- public IMRStorageOutputFormat getStorageOutputFormat();
-
- /** Add step that executes after merge dictionary and before merge cube. */
- public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
-
- /** Add step that executes after merge cube. */
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
- }
-
- @SuppressWarnings("rawtypes")
- public interface IMRStorageOutputFormat {
- public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-
- public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
deleted file mode 100644
index 3217e4b..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
-import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
-import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Hold reusable steps for builders.
- */
-public class JobBuilderSupport {
-
- final protected JobEngineConfig config;
- final protected CubeSegment seg;
- final protected String submitter;
-
- public JobBuilderSupport(CubeSegment seg, String submitter) {
- Preconditions.checkNotNull(seg, "segment cannot be null");
- this.config = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- this.seg = seg;
- this.submitter = submitter;
- }
-
- public MapReduceExecutable createFactDistinctColumnsStep(String jobId) {
- return createFactDistinctColumnsStep(jobId, false);
- }
-
- public MapReduceExecutable createFactDistinctColumnsStepWithStats(String jobId) {
- return createFactDistinctColumnsStep(jobId, true);
- }
-
- private MapReduceExecutable createFactDistinctColumnsStep(String jobId, boolean withStats) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- result.setMapReduceJobClass(FactDistinctColumnsJob.class);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(jobId));
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(withStats));
- appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(jobId));
- appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(config.getConfig().getCubingInMemSamplingPercent()));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
-
- result.setMapReduceParams(cmd.toString());
- return result;
- }
-
- public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
- // base cuboid job
- HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
- buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", getFactDistinctColumnsPath(jobId));
-
- buildDictionaryStep.setJobParams(cmd.toString());
- buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
- return buildDictionaryStep;
- }
-
- public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId) {
- final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
- updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
- updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCubingJobId(jobId);
- return updateCubeInfoStep;
- }
-
- public MergeDictionaryStep createMergeDictionaryStep(List<String> mergingSegmentIds) {
- MergeDictionaryStep result = new MergeDictionaryStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- return result;
- }
-
- public UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(List<String> mergingSegmentIds, String jobId) {
- UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
- result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setCubingJobId(jobId);
- return result;
- }
-
- // ============================================================================
-
- public String getJobWorkingDir(String jobId) {
- return getJobWorkingDir(config, jobId);
- }
-
- public String getCuboidRootPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
- }
-
- public String getCuboidRootPath(CubeSegment seg) {
- return getCuboidRootPath(seg.getLastBuildJobID());
- }
-
- public void appendMapReduceParameters(StringBuilder buf, CubeSegment seg) {
- try {
- String jobConf = config.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
- if (jobConf != null && jobConf.length() > 0) {
- buf.append(" -conf ").append(jobConf);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- public String getFactDistinctColumnsPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
- }
-
-
- public String getStatisticsPath(String jobId) {
- return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/statistics";
- }
-
- // ============================================================================
- // static methods also shared by other job flow participant
- // ----------------------------------------------------------------------------
-
- public static String getJobWorkingDir(JobEngineConfig conf, String jobId) {
- return conf.getHdfsWorkingDirectory() + "/" + "kylin-" + jobId;
- }
-
- public static StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
- return buf.append(" -").append(paraName).append(" ").append(paraValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
deleted file mode 100644
index 61328c9..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine implements IBatchCubingEngine {
-
- @Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new BatchCubingJobBuilder(newSegment, submitter).build();
- }
-
- @Override
- public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return new BatchMergeJobBuilder(mergeSegment, submitter).build();
- }
-
- @Override
- public Class<?> getSourceInterface() {
- return IMRInput.class;
- }
-
- @Override
- public Class<?> getStorageInterface() {
- return IMROutput.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java b/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
deleted file mode 100644
index 57ec128..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRBatchCubingEngine2.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.IBatchCubingEngine;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-
-public class MRBatchCubingEngine2 implements IBatchCubingEngine {
-
- @Override
- public DefaultChainedExecutable createBatchCubingJob(CubeSegment newSegment, String submitter) {
- return new BatchCubingJobBuilder2(newSegment, submitter).build();
- }
-
- @Override
- public DefaultChainedExecutable createBatchMergeJob(CubeSegment mergeSegment, String submitter) {
- return new BatchMergeJobBuilder2(mergeSegment, submitter).build();
- }
-
- @Override
- public Class<?> getSourceInterface() {
- return IMRInput.class;
- }
-
- @Override
- public Class<?> getStorageInterface() {
- return IMROutput2.class;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
deleted file mode 100644
index dc0533e..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ /dev/null
@@ -1,55 +0,0 @@
-package org.apache.kylin.engine.mr;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
-import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.TableSourceFactory;
-import org.apache.kylin.storage.StorageFactory2;
-
-public class MRUtil {
-
- public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment seg) {
- return TableSourceFactory.createEngineAdapter(seg, IMRInput.class).getBatchCubingInputSide(seg);
- }
-
- public static IMRTableInputFormat getTableInputFormat(String tableName) {
- return getTableInputFormat(getTableDesc(tableName));
- }
-
- public static IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
- return TableSourceFactory.createEngineAdapter(tableDesc, IMRInput.class).getTableInputFormat(tableDesc);
- }
-
- private static TableDesc getTableDesc(String tableName) {
- return MetadataManager.getInstance(KylinConfig.getInstanceFromEnv()).getTableDesc(tableName);
- }
-
- public static IMRBatchCubingOutputSide getBatchCubingOutputSide(CubeSegment seg) {
- return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchCubingOutputSide(seg);
- }
-
- public static IMRBatchMergeOutputSide getBatchMergeOutputSide(CubeSegment seg) {
- return StorageFactory2.createEngineAdapter(seg, IMROutput.class).getBatchMergeOutputSide(seg);
- }
-
- public static IMRBatchCubingOutputSide2 getBatchCubingOutputSide2(CubeSegment seg) {
- return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
- }
-
- public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
- return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
- }
-
- public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
- return StorageFactory2.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
deleted file mode 100644
index aae5d89..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class InMemCuboidJob extends AbstractHadoopJob {
-
- protected static final Logger logger = LoggerFactory.getLogger(InMemCuboidJob.class);
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_JOB_FLOW_ID);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME);
-
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
- logger.info("Starting: " + job.getJobName());
-
- setJobClasspath(job);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- long timeout = 1000*60*60L; // 1 hour
- job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
-
- // set input
- IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
- flatTableInputFormat.configureJob(job);
-
- // set mapper
- job.setMapperClass(InMemCuboidMapper.class);
- job.setMapOutputKeyClass(ByteArrayWritable.class);
- job.setMapOutputValueClass(ByteArrayWritable.class);
-
- // set output
- IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
- storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in CuboidJob", e);
- printUsage(options);
- throw e;
- }
- }
-
- public static void main(String[] args) throws Exception {
- InMemCuboidJob job = new InMemCuboidJob();
- int exitCode = ToolRunner.run(job, args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
deleted file mode 100644
index ff6ffe5..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
-
- private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment cubeSegment;
- private IMRTableInputFormat flatTableInputFormat;
-
- private int counter;
- private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
- private Future<?> future;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- Configuration conf = context.getConfiguration();
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
- String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
-
- Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (cubeDesc.getRowkey().isUseDictionary(col)) {
- Dictionary<?> dict = cubeSegment.getDictionary(col);
- if (dict == null) {
- logger.warn("Dictionary for " + col + " was not found.");
- }
-
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
- }
- }
-
- DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
- ExecutorService executorService = Executors.newSingleThreadExecutor();
- future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
-
- }
-
- @Override
- public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- // put each row to the queue
- String[] row = flatTableInputFormat.parseMapperInput(record);
- List<String> rowAsList = Arrays.asList(row);
-
- while (!future.isDone()) {
- if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
- break;
- }
- }
- }
-
- @Override
- protected void cleanup(Context context) throws IOException, InterruptedException {
- logger.info("Totally handled " + counter + " records!");
-
- while (!future.isDone()) {
- if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
- break;
- }
- }
-
- try {
- future.get();
- } catch (Exception e) {
- throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
- }
- queue.clear();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
deleted file mode 100644
index 45da2c8..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
-
- private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
-
- private IMRStorageOutputFormat storageOutputFormat;
- private MeasureCodec codec;
- private MeasureAggregators aggs;
-
- private int counter;
- private Object[] input;
- private Object[] result;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
-
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
- CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- if (isMerge)
- storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
- else
- storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
-
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
-
- codec = new MeasureCodec(measuresDescs);
- aggs = new MeasureAggregators(measuresDescs);
-
- input = new Object[measuresDescs.size()];
- result = new Object[measuresDescs.size()];
- }
-
- @Override
- public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
-
- aggs.reset();
-
- for (ByteArrayWritable value : values) {
- codec.decode(value.asBuffer(), input);
- aggs.aggregate(input);
- }
- aggs.collectStates(result);
-
- storageOutputFormat.doReducerOutput(key, result, context);
-
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
deleted file mode 100644
index f2a5fcf..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-/**
- */
-public class MapContextGTRecordWriter implements ICuboidWriter {
-
- private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
- protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
- private Long lastCuboidId;
- protected CubeSegment cubeSegment;
- protected CubeDesc cubeDesc;
-
- private int bytesLength;
- private int dimensions;
- private int measureCount;
- private byte[] keyBuf;
- private int[] measureColumnsIndex;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private ByteArrayWritable outputKey = new ByteArrayWritable();
- private ByteArrayWritable outputValue = new ByteArrayWritable();
- private long cuboidRowCount = 0;
-
- public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
- this.mapContext = mapContext;
- this.cubeDesc = cubeDesc;
- this.cubeSegment = cubeSegment;
- this.measureCount = cubeDesc.getMeasures().size();
-
- }
-
- @Override
- public void write(long cuboidId, GTRecord record) throws IOException {
-
- if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
- // output another cuboid
- initVariables(cuboidId);
- if (lastCuboidId != null) {
- logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
- cuboidRowCount = 0;
- }
- }
-
- cuboidRowCount++;
- int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
- for (int x = 0; x < dimensions; x++) {
- System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
- offSet += record.get(x).length();
- }
-
- //output measures
- valueBuf.clear();
- record.exportColumns(measureColumnsIndex, valueBuf);
-
- outputKey.set(keyBuf, 0, offSet);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- try {
- mapContext.write(outputKey, outputValue);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void initVariables(Long cuboidId) {
- bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
- for (TblColRef column : cuboid.getColumns()) {
- bytesLength += cubeSegment.getColumnLength(column);
- }
-
- keyBuf = new byte[bytesLength];
- dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
- measureColumnsIndex = new int[measureCount];
- for (int i = 0; i < measureCount; i++) {
- measureColumnsIndex[i] = dimensions + i;
- }
-
- System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
deleted file mode 100644
index 3d3b1f4..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.CuboidJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageJob extends CuboidJob {
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- try {
- options.addOption(OPTION_JOB_NAME);
- options.addOption(OPTION_JOB_FLOW_ID);
- options.addOption(OPTION_CUBE_NAME);
- options.addOption(OPTION_SEGMENT_NAME);
- parseOptions(options, args);
-
- String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
- String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- CubeManager cubeMgr = CubeManager.getInstance(config);
- CubeInstance cube = cubeMgr.getCube(cubeName);
- CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- Configuration conf = this.getConf();
- HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- System.out.println("Starting: " + jobName);
- job = Job.getInstance(conf, jobName);
-
- setJobClasspath(job);
-
- // add metadata to distributed cache
- attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
- // set job configuration
- job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
-
- // configure mapper input
- IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
- storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
-
- // configure reducer output
- IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
- storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-
- return waitForCompletion(job);
- } catch (Exception e) {
- logger.error("error in MergeCuboidFromHBaseJob", e);
- printUsage(options);
- throw e;
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
deleted file mode 100644
index 1162f14..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
-
- private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
-
- private KylinConfig config;
- private String cubeName;
- private String segmentName;
- private CubeManager cubeManager;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment mergedCubeSegment;
- private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
- private IMRStorageInputFormat storageInputFormat;
-
- private ByteArrayWritable outputKey = new ByteArrayWritable();
- private byte[] newKeyBuf;
- private RowKeySplitter rowKeySplitter;
-
- private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
- private MeasureCodec codec;
- private ByteArrayWritable outputValue = new ByteArrayWritable();
-
- private Boolean checkNeedMerging(TblColRef col) throws IOException {
- Boolean ret = dictsNeedMerging.get(col);
- if (ret != null)
- return ret;
- else {
- ret = cubeDesc.getRowkey().isUseDictionary(col);
- if (ret) {
- String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
- }
- dictsNeedMerging.put(col, ret);
- return ret;
- }
- }
-
- @Override
- protected void setup(Context context) throws IOException, InterruptedException {
- super.bindCurrentConfiguration(context.getConfiguration());
- config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
- cubeManager = CubeManager.getInstance(config);
- cube = cubeManager.getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
- storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
-
- newKeyBuf = new byte[256];// size will auto-grow
-
- sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
- logger.info(sourceCubeSegment.toString());
-
- this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-
- List<MeasureDesc> measuresDescs = Lists.newArrayList();
- for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
- for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
- for (MeasureDesc measure : colDesc.getMeasures()) {
- measuresDescs.add(measure);
- }
- }
- }
- codec = new MeasureCodec(measuresDescs);
- }
-
- @Override
- public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
- Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
- ByteArrayWritable key = pair.getFirst();
- Object[] value = pair.getSecond();
-
- Preconditions.checkState(key.offset() == 0);
-
- long cuboidID = rowKeySplitter.split(key.array(), key.length());
- Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
- SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
- int bufOffset = 0;
- BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
- bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
- for (int i = 0; i < cuboid.getColumns().size(); ++i) {
- TblColRef col = cuboid.getColumns().get(i);
-
- if (this.checkNeedMerging(col)) {
- // if dictionary on fact table column, needs rewrite
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
- while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
-
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
- int idInMergedDict;
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
- }
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-
- bufOffset += mergedDict.getSizeOfId();
- } else {
- // keep as it is
- while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBuf;
- newKeyBuf = new byte[2 * newKeyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
- }
-
- System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
- bufOffset += splittedByteses[i + 1].length;
- }
- }
- byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
- outputKey.set(newKey, 0, newKey.length);
-
- valueBuf.clear();
- codec.encode(value, valueBuf);
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
-
- context.write(outputKey, outputValue);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
deleted file mode 100644
index d99cb03..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
- public MergeDictionaryStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig conf = context.getConfig();
- final CubeManager mgr = CubeManager.getInstance(conf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
- final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-
- Collections.sort(mergingSegments);
-
- try {
- checkLookupSnapshotsMustIncremental(mergingSegments);
-
- makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
- makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(newSegment);
- mgr.updateCube(cubeBuilder);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to merge dictionary or lookup snapshots", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- private List<CubeSegment> getMergingSegments(CubeInstance cube) {
- List<String> mergingSegmentIds = getMergingSegmentIds();
- List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
- for (String id : mergingSegmentIds) {
- result.add(cube.getSegmentById(id));
- }
- return result;
- }
-
- private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
- // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
- }
-
- /**
- * For the new segment, we need to create dictionaries for it, too. For
- * those dictionaries on fact table, create it by merging underlying
- * dictionaries For those dictionaries on lookup table, just copy it from
- * any one of the merging segments, it's guaranteed to be consistent(checked
- * in CubeSegmentValidator)
- *
- * @param cube
- * @param newSeg
- * @throws IOException
- */
- private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
- HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
- HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
- DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
- CubeDesc cubeDesc = cube.getDescriptor();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
- colsNeedMeringDict.add(col);
- } else {
- colsNeedCopyDict.add(col);
- }
- }
- }
- }
-
- for (TblColRef col : colsNeedMeringDict) {
- logger.info("Merging fact table dictionary on : " + col);
- List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
- for (CubeSegment segment : mergingSegments) {
- logger.info("Including fact table dictionary of segment : " + segment);
- if (segment.getDictResPath(col) != null) {
- DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
- dictInfos.add(dictInfo);
- }
- }
- mergeDictionaries(dictMgr, newSeg, dictInfos, col);
- }
-
- for (TblColRef col : colsNeedCopyDict) {
- String path = mergingSegments.get(0).getDictResPath(col);
- newSeg.putDictResPath(col, path);
- }
- }
-
- private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
- DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
- if (dictInfo != null)
- cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
- return dictInfo;
- }
-
- /**
- * make snapshots for the new segment by copying from one of the underlying
- * merging segments. it's guaranteed to be consistent(checked in
- * CubeSegmentValidator)
- *
- * @param cube
- * @param newSeg
- */
- private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
- CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
- for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
- newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
- }
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id: splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
deleted file mode 100644
index 8bd6ea2..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MergeStatisticsStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
- private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
- protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
-
- public MergeStatisticsStep() {
- super();
- }
-
- @Override
- @SuppressWarnings("deprecation")
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig kylinConf = context.getConfig();
- final CubeManager mgr = CubeManager.getInstance(kylinConf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
- Configuration conf = new Configuration();
- ResourceStore rs = ResourceStore.getStore(kylinConf);
- try {
-
- int averageSamplingPercentage = 0;
- for (String segmentId : this.getMergingSegmentIds()) {
- String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
- InputStream is = rs.getResource(fileKey);
- File tempFile = null;
- FileOutputStream tempFileStream = null;
- try {
- tempFile = File.createTempFile(segmentId, ".seq");
- tempFileStream = new FileOutputStream(tempFile);
- org.apache.commons.io.IOUtils.copy(is, tempFileStream);
- } finally {
- IOUtils.closeStream(is);
- IOUtils.closeStream(tempFileStream);
- }
-
- FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
- LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- if (key.get() == 0l) {
- // sampling percentage;
- averageSamplingPercentage += Bytes.toInt(value.getBytes());
- } else {
- HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
- ByteArray byteArray = new ByteArray(value.getBytes());
- hll.readRegisters(byteArray.asBuffer());
-
- if (cuboidHLLMap.get(key.get()) != null) {
- cuboidHLLMap.get(key.get()).merge(hll);
- } else {
- cuboidHLLMap.put(key.get(), hll);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
- } finally {
- IOUtils.closeStream(reader);
- }
- }
- averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
- FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
- Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
- FileSystem fs = statisticsFilePath.getFileSystem(conf);
- FSDataInputStream is = fs.open(statisticsFilePath);
- try {
- // put the statistics to metadata store
- String statisticsFileName = newSegment.getStatisticsResourcePath();
- rs.putResource(statisticsFileName, is, System.currentTimeMillis());
- } finally {
- IOUtils.closeStream(is);
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to merge cuboid statistics", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setMergedStatisticsPath(String path) {
- setParam(MERGED_STATISTICS_PATH, path);
- }
-
- private String getMergedStatisticsPath() {
- return getParam(MERGED_STATISTICS_PATH);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
deleted file mode 100644
index 14eef1a..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- * Save the cube segment statistic to Kylin metadata store
- *
- */
-public class SaveStatisticsStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String STATISTICS_PATH = "statisticsPath";
-
- public SaveStatisticsStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- KylinConfig kylinConf = context.getConfig();
- final CubeManager mgr = CubeManager.getInstance(kylinConf);
- final CubeInstance cube = mgr.getCube(getCubeName());
- final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
- ResourceStore rs = ResourceStore.getStore(kylinConf);
- try {
- Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
- FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
- if (!fs.exists(statisticsFilePath))
- throw new IOException("File " + statisticsFilePath + " does not exists;");
-
- FSDataInputStream is = fs.open(statisticsFilePath);
- try {
- // put the statistics to metadata store
- String statisticsFileName = newSegment.getStatisticsResourcePath();
- rs.putResource(statisticsFileName, is, System.currentTimeMillis());
- fs.delete(statisticsFilePath, false);
- } finally {
- IOUtils.closeStream(is);
- }
-
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to save cuboid statistics", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setStatisticsPath(String path) {
- this.setParam(STATISTICS_PATH, path);
- }
-
- private String getStatisticsPath() {
- return getParam(STATISTICS_PATH);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index dd99a64..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
- private static final String SEGMENT_ID = "segmentId";
- private static final String CUBE_NAME = "cubeName";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
- public UpdateCubeInfoAfterBuildStep() {
- super();
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
- final CubeInstance cube = cubeManager.getCube(getCubeName());
- final CubeSegment segment = cube.getSegmentById(getSegmentId());
-
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
- long sourceCount = cubingJob.findSourceRecordCount();
- long sourceSizeBytes = cubingJob.findSourceSizeBytes();
- long cubeSizeBytes = cubingJob.findCubeSizeBytes();
- boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
- segment.setLastBuildJobID(getCubingJobId());
- segment.setLastBuildTime(System.currentTimeMillis());
- segment.setSizeKB(cubeSizeBytes / 1024);
- segment.setInputRecords(sourceCount);
- segment.setInputRecordsSize(sourceSizeBytes);
-
- try {
- if (segmentReady) {
- cubeManager.promoteNewlyBuiltSegments(cube, segment);
- } else {
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
- cubeManager.updateCube(cubeBuilder);
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
- } catch (IOException e) {
- logger.error("fail to update cube after build", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index d237908..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
- private static final String CUBE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
- private static final String CUBING_JOB_ID = "cubingJobId";
-
- private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
- public UpdateCubeInfoAfterMergeStep() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final CubeInstance cube = cubeManager.getCube(getCubeName());
-
- CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
- if (mergedSegment == null) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
- }
-
- CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
- long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
- // collect source statistics
- List<String> mergingSegmentIds = getMergingSegmentIds();
- if (mergingSegmentIds.isEmpty()) {
- return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
- }
- long sourceCount = 0L;
- long sourceSize = 0L;
- for (String id : mergingSegmentIds) {
- CubeSegment segment = cube.getSegmentById(id);
- sourceCount += segment.getInputRecords();
- sourceSize += segment.getInputRecordsSize();
- }
-
- // update segment info
- mergedSegment.setSizeKB(cubeSizeBytes / 1024);
- mergedSegment.setInputRecords(sourceCount);
- mergedSegment.setInputRecordsSize(sourceSize);
- mergedSegment.setLastBuildJobID(getCubingJobId());
- mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
- try {
- cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- } catch (IOException e) {
- logger.error("fail to update cube after merge", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setSegmentId(String segmentId) {
- this.setParam(SEGMENT_ID, segmentId);
- }
-
- private String getSegmentId() {
- return getParam(SEGMENT_ID);
- }
-
- public void setCubeName(String cubeName) {
- this.setParam(CUBE_NAME, cubeName);
- }
-
- private String getCubeName() {
- return getParam(CUBE_NAME);
- }
-
- public void setMergingSegmentIds(List<String> ids) {
- setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
- }
-
- private List<String> getMergingSegmentIds() {
- final String ids = getParam(MERGING_SEGMENT_IDS);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setCubingJobId(String id) {
- setParam(CUBING_JOB_ID, id);
- }
-
- private String getCubingJobId() {
- return getParam(CUBING_JOB_ID);
- }
-}