You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/09/11 05:34:49 UTC
[2/3] incubator-kylin git commit: KYLIN-1007 Generates cuboid file in
build and merge
KYLIN-1007 Generates cuboid file in build and merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/f156d138
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/f156d138
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/f156d138
Branch: refs/heads/2.x-staging
Commit: f156d138d34b58e5c620c0d1c8678029b5aa41a0
Parents: fe9e02c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Sep 10 14:31:03 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Sep 11 11:34:04 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/engine/mr/CubingJob.java | 24 +-
.../kylin/storage/hbase/HBaseStorage.java | 3 +-
.../hbase/steps/HBaseMROutput2Transition.java | 266 +++++++++++++++++++
.../kylin/storage/hbase/steps/HBaseMRSteps.java | 12 +-
4 files changed, 297 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index b2f0a06..ed5c036 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -21,6 +21,8 @@ package org.apache.kylin.engine.mr;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.TimeZone;
@@ -179,11 +181,26 @@ public class CubingJob extends DefaultChainedExecutable {
}
public long findCubeSizeBytes() {
- return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
+ // look for the info BACKWARD, let the last step that claims the cube size win
+ return Long.parseLong(findExtraInfoBackward(CUBE_SIZE_BYTES, "0"));
+ }
+
+ public String findExtraInfo(String key, String dft) {
+ return findExtraInfo(key, dft, false);
}
- private String findExtraInfo(String key, String dft) {
- for (AbstractExecutable child : getTasks()) {
+ public String findExtraInfoBackward(String key, String dft) {
+ return findExtraInfo(key, dft, true);
+ }
+
+ private String findExtraInfo(String key, String dft, boolean backward) {
+ ArrayList<AbstractExecutable> tasks = new ArrayList<AbstractExecutable>(getTasks());
+
+ if (backward) {
+ Collections.reverse(tasks);
+ }
+
+ for (AbstractExecutable child : tasks) {
Output output = executableManager.getOutput(child.getId());
String value = output.getExtra().get(key);
if (value != null)
@@ -191,4 +208,5 @@ public class CubingJob extends DefaultChainedExecutable {
}
return dft;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 1d8c8c8..421f648 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -36,6 +36,7 @@ import org.apache.kylin.storage.cache.CacheFledgedDynamicQuery;
import org.apache.kylin.storage.cache.CacheFledgedStaticQuery;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
import org.apache.kylin.storage.hbase.steps.HBaseMROutput2;
+import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.apache.kylin.storage.hybrid.HybridStorageQuery;
@@ -105,7 +106,7 @@ public class HBaseStorage implements IStorage {
if (engineInterface == IMROutput.class) {
return (I) new HBaseMROutput();
} else if (engineInterface == IMROutput2.class) {
- return (I) new HBaseMROutput2();
+ return (I) new HBaseMROutput2Transition();
} else {
throw new RuntimeException("Cannot adapt to " + engineInterface);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
new file mode 100644
index 0000000..047315b
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This "Transition" MR output generates cuboid files and then convert to HFile.
+ * The additional step slows down build process slightly, but the gains is merge
+ * can read from HDFS instead of over HBase region server. See KYLIN-1007.
+ *
+ * This is transitional because finally we want to merge from HTable snapshot.
+ * However MR input with multiple snapshots is only supported by HBase 1.x.
+ * Before most users upgrade to latest HBase, they can only use this transitional
+ * cuboid file solution.
+ */
+public class HBaseMROutput2Transition implements IMROutput2 {
+
+ @Override
+ public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
+ return new IMRBatchCubingOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public IMRStorageOutputFormat getStorageOutputFormat() {
+ return new HBaseOutputFormat(seg);
+ }
+
+ @Override
+ public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ // nothing to do
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeInputSide2 getBatchMergeInputSide(final CubeSegment seg) {
+ return new IMRBatchMergeInputSide2() {
+ @Override
+ public IMRStorageInputFormat getStorageInputFormat() {
+ return new HBaseInputFormat(seg);
+ }
+ };
+ }
+
+ @Override
+ public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(final CubeSegment seg) {
+ return new IMRBatchMergeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public IMRStorageOutputFormat getStorageOutputFormat() {
+ return new HBaseOutputFormat(seg);
+ }
+
+ @Override
+ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow) {
+ String cuboidRootPath = steps.getCuboidRootPath(jobFlow.getId());
+
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(cuboidRootPath, jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ @Override
+ public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createMergeGCStep());
+ }
+ };
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static class HBaseInputFormat implements IMRStorageInputFormat {
+ final Iterable<String> htables;
+
+ final RowValueDecoder[] rowValueDecoders;
+ final ByteArrayWritable parsedKey;
+ final Object[] parsedValue;
+ final Pair<ByteArrayWritable, Object[]> parsedPair;
+
+ public HBaseInputFormat(CubeSegment seg) {
+ this.htables = new HBaseMRSteps(seg).getMergingHTables();
+
+ List<RowValueDecoder> valueDecoderList = Lists.newArrayList();
+ List<MeasureDesc> measuresDescs = Lists.newArrayList();
+ for (HBaseColumnFamilyDesc cfDesc : seg.getCubeDesc().getHBaseMapping().getColumnFamily()) {
+ for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+ valueDecoderList.add(new RowValueDecoder(colDesc));
+ for (MeasureDesc measure : colDesc.getMeasures()) {
+ measuresDescs.add(measure);
+ }
+ }
+ }
+ this.rowValueDecoders = valueDecoderList.toArray(new RowValueDecoder[valueDecoderList.size()]);
+
+ this.parsedKey = new ByteArrayWritable();
+ this.parsedValue = new Object[measuresDescs.size()];
+ this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+ }
+
+ @Override
+ public void configureInput(Class<? extends Mapper> mapper, Class<? extends WritableComparable> outputKeyClass, Class<? extends Writable> outputValueClass, Job job) throws IOException {
+ Configuration conf = job.getConfiguration();
+ HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
+
+ List<Scan> scans = new ArrayList<Scan>();
+ for (String htable : htables) {
+ Scan scan = new Scan();
+ scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
+ scan.setCacheBlocks(false); // don't set to true for MR jobs
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(htable));
+ scans.add(scan);
+ }
+ TableMapReduceUtil.initTableMapperJob(scans, (Class<? extends TableMapper>) mapper, outputKeyClass, outputValueClass, job);
+ }
+
+ @Override
+ public CubeSegment findSourceSegment(Context context, CubeInstance cubeInstance) throws IOException {
+ TableSplit currentSplit = (TableSplit) context.getInputSplit();
+ byte[] tableName = currentSplit.getTableName();
+ String htableName = Bytes.toString(tableName);
+
+ // decide which source segment
+ for (CubeSegment segment : cubeInstance.getSegments()) {
+ String segmentHtable = segment.getStorageLocationIdentifier();
+ if (segmentHtable != null && segmentHtable.equalsIgnoreCase(htableName)) {
+ return segment;
+ }
+ }
+ throw new IllegalStateException("No merging segment's storage location identifier equals " + htableName);
+ }
+
+ @Override
+ public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue) {
+ ImmutableBytesWritable key = (ImmutableBytesWritable) inKey;
+ parsedKey.set(key.get(), key.getOffset(), key.getLength());
+
+ Result hbaseRow = (Result) inValue;
+ for (int i = 0; i < rowValueDecoders.length; i++) {
+ rowValueDecoders[i].decode(hbaseRow);
+ rowValueDecoders[i].loadCubeMeasureArray(parsedValue);
+ }
+
+ return parsedPair;
+ }
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ private static class HBaseOutputFormat implements IMRStorageOutputFormat {
+ final CubeSegment seg;
+
+ Text outputKey;
+ Text outputValue;
+ ByteBuffer valueBuf;
+ MeasureCodec codec;
+
+ public HBaseOutputFormat(CubeSegment seg) {
+ this.seg = seg;
+ }
+
+ @Override
+ public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException {
+ job.setReducerClass(reducer);
+
+ // the cuboid file and KV class must be compatible with 0.7 version for smooth upgrade
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ job.setOutputKeyClass(Text.class);
+ job.setOutputValueClass(Text.class);
+
+ Path output = new Path(new HBaseMRSteps(seg).getCuboidRootPath(jobFlowId));
+ FileOutputFormat.setOutputPath(job, output);
+
+ HadoopUtil.deletePath(job.getConfiguration(), output);
+ }
+
+ @Override
+ public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException {
+ // lazy init
+ if (outputKey == null) {
+ outputKey = new Text();
+ outputValue = new Text();
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ codec = new MeasureCodec(seg.getCubeDesc().getMeasures());
+ }
+
+ outputKey.set(key.array(), key.offset(), key.length());
+
+ valueBuf.clear();
+ codec.encode(value, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+
+ context.write(outputKey, outputValue);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/f156d138/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 64374e6..03b4361 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -24,16 +24,18 @@ public class HBaseMRSteps extends JobBuilderSupport {
String jobId = jobFlow.getId();
// calculate key distribution
- jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath + "*", jobId));
+ jobFlow.addTask(createRangeRowkeyDistributionStep(cuboidRootPath, jobId));
// create htable step
jobFlow.addTask(createCreateHTableStep(jobId));
// generate hfiles step
- jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath + "*", jobId));
+ jobFlow.addTask(createConvertCuboidToHfileStep(cuboidRootPath, jobId));
// bulk load step
jobFlow.addTask(createBulkLoadStep(jobId));
}
- public MapReduceExecutable createRangeRowkeyDistributionStep(String inputPath, String jobId) {
+ public MapReduceExecutable createRangeRowkeyDistributionStep(String cuboidRootPath, String jobId) {
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
StringBuilder cmd = new StringBuilder();
@@ -73,7 +75,9 @@ public class HBaseMRSteps extends JobBuilderSupport {
return createHtableStep;
}
- public MapReduceExecutable createConvertCuboidToHfileStep(String inputPath, String jobId) {
+ public MapReduceExecutable createConvertCuboidToHfileStep(String cuboidRootPath, String jobId) {
+ String inputPath = cuboidRootPath + (cuboidRootPath.endsWith("/") ? "" : "/") + "*";
+
MapReduceExecutable createHFilesStep = new MapReduceExecutable();
createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
StringBuilder cmd = new StringBuilder();