You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:05:14 UTC
[37/50] incubator-kylin git commit: KYLIN-653 add ii2basecuboid mapper
KYLIN-653 add ii2basecuboid mapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/929b986d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/929b986d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/929b986d
Branch: refs/heads/streaming-localdict
Commit: 929b986d6d7396204d443aa6e420dd745a217611
Parents: d1c115d
Author: honma <ho...@ebay.com>
Authored: Fri Mar 27 15:56:10 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Fri Mar 27 15:56:10 2015 +0800
----------------------------------------------------------------------
.../invertedindex/index/RawTableRecord.java | 2 +
.../kylin/job/hadoop/cube/BaseCuboidJob.java | 2 +-
.../kylin/job/hadoop/cube/BaseCuboidMapper.java | 246 -------------------
.../job/hadoop/cube/BaseCuboidMapperBase.java | 205 ++++++++++++++++
.../job/hadoop/cube/HiveToBaseCuboidMapper.java | 49 ++++
.../job/hadoop/cube/IIToBaseCuboidMapper.java | 109 ++++++++
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 5 -
.../cube/BaseCuboidMapperPerformanceTest.java | 65 -----
.../job/hadoop/cube/BaseCuboidMapperTest.java | 145 -----------
.../HiveToBaseCuboidMapperPerformanceTest.java | 65 +++++
.../hadoop/cube/HiveToBaseCuboidMapperTest.java | 145 +++++++++++
11 files changed, 576 insertions(+), 462 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
index 895fd4f..ccfc5b1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/RawTableRecord.java
@@ -18,6 +18,7 @@
package org.apache.kylin.invertedindex.index;
+import com.google.common.base.Preconditions;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.dict.Dictionary;
import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
@@ -100,6 +101,7 @@ public class RawTableRecord implements Cloneable {
bytes.set(buf, digest.offset(col), digest.length(col));
}
+
@Override
public Object clone() {
return new RawTableRecord(this);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
index 5f7802a..06046c5 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidJob.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.util.ToolRunner;
public class BaseCuboidJob extends CuboidJob {
public BaseCuboidJob() {
- this.setMapperClass(BaseCuboidMapper.class);
+ this.setMapperClass(HiveToBaseCuboidMapper.class);
}
public static void main(String[] args) throws Exception {
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
deleted file mode 100644
index a023c0c..0000000
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapper.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.mr.KylinMapper;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.common.util.BytesSplitter;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author George Song (ysong1)
- */
-public class BaseCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Text, Text, Text> {
-
- private static final Logger logger = LoggerFactory.getLogger(BaseCuboidMapper.class);
-
- public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
- public static final byte[] ONE = Bytes.toBytes("1");
-
- private String cubeName;
- private String segmentName;
- private Cuboid baseCuboid;
- private CubeInstance cube;
- private CubeDesc cubeDesc;
- private CubeSegment cubeSegment;
- private List<byte[]> nullBytes;
-
- private CubeJoinedFlatTableDesc intermediateTableDesc;
- private String intermediateTableRowDelimiter;
- private byte byteRowDelimiter;
-
- private int counter;
- private int errorRecordCounter;
- private Text outputKey = new Text();
- private Text outputValue = new Text();
- private Object[] measures;
- private byte[][] keyBytesBuf;
- private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
- private BytesSplitter bytesSplitter;
- private AbstractRowKeyEncoder rowKeyEncoder;
- private MeasureCodec measureCodec;
-
- @Override
- protected void setup(Context context) throws IOException {
- super.publishConfiguration(context.getConfiguration());
-
- cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
- segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
- intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
- if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
- throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
- }
-
- byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
-
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
- cube = CubeManager.getInstance(config).getCube(cubeName);
- cubeDesc = cube.getDescriptor();
- cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
-
- intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
-
- bytesSplitter = new BytesSplitter(200, 4096);
- rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
-
- measureCodec = new MeasureCodec(cubeDesc.getMeasures());
- measures = new Object[cubeDesc.getMeasures().size()];
-
- int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
- keyBytesBuf = new byte[colCount][];
-
- initNullBytes();
- }
-
- private void initNullBytes() {
- nullBytes = Lists.newArrayList();
- nullBytes.add(HIVE_NULL);
- String[] nullStrings = cubeDesc.getNullStrings();
- if (nullStrings != null) {
- for (String s : nullStrings) {
- nullBytes.add(Bytes.toBytes(s));
- }
- }
- }
-
- private boolean isNull(byte[] v) {
- for (byte[] nullByte : nullBytes) {
- if (Bytes.equals(v, nullByte))
- return true;
- }
- return false;
- }
-
- private byte[] buildKey(SplittedBytes[] splitBuffers) {
- int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
- for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
- int index = rowKeyColumnIndexes[i];
- keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
- if (isNull(keyBytesBuf[i])) {
- keyBytesBuf[i] = null;
- }
- }
- return rowKeyEncoder.encode(keyBytesBuf);
- }
-
- private void buildValue(SplittedBytes[] splitBuffers) {
-
- for (int i = 0; i < measures.length; i++) {
- byte[] valueBytes = getValueBytes(splitBuffers, i);
- measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
- }
-
- valueBuf.clear();
- measureCodec.encode(measures, valueBuf);
- }
-
- private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
- MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
- FunctionDesc func = desc.getFunction();
- ParameterDesc paramDesc = func.getParameter();
- int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
- byte[] result = null;
-
- // constant
- if (flatTableIdx == null) {
- result = Bytes.toBytes(paramDesc.getValue());
- }
- // column values
- else {
- // for multiple columns, their values are joined
- for (int i = 0; i < flatTableIdx.length; i++) {
- SplittedBytes split = splitBuffers[flatTableIdx[i]];
- if (result == null) {
- result = Arrays.copyOf(split.value, split.length);
- } else {
- byte[] newResult = new byte[result.length + split.length];
- System.arraycopy(result, 0, newResult, 0, result.length);
- System.arraycopy(split.value, 0, newResult, result.length, split.length);
- result = newResult;
- }
- }
- }
-
- if (func.isCount() || func.isHolisticCountDistinct()) {
- // note for holistic count distinct, this value will be ignored
- result = ONE;
- }
-
- if (isNull(result)) {
- result = null;
- }
-
- return result;
- }
-
- @Override
- public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
- counter++;
- if (counter % BatchConstants.COUNTER_MAX == 0) {
- logger.info("Handled " + counter + " records!");
- }
-
- try {
- bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
- outputKV(context);
-
- } catch (Exception ex) {
- handleErrorRecord(bytesSplitter, ex);
- }
- }
-
- private void outputKV(Context context) throws IOException, InterruptedException {
- intermediateTableDesc.sanityCheck(bytesSplitter);
-
- byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
- outputKey.set(rowKey, 0, rowKey.length);
-
- buildValue(bytesSplitter.getSplitBuffers());
- outputValue.set(valueBuf.array(), 0, valueBuf.position());
- context.write(outputKey, outputValue);
- }
-
- private void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
-
- System.err.println("Insane record: " + bytesSplitter);
- ex.printStackTrace(System.err);
-
- errorRecordCounter++;
- if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
- if (ex instanceof IOException)
- throw (IOException) ex;
- else if (ex instanceof RuntimeException)
- throw (RuntimeException) ex;
- else
- throw new RuntimeException("", ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
new file mode 100644
index 0000000..e2972dc
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperBase.java
@@ -0,0 +1,205 @@
+package org.apache.kylin.job.hadoop.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 3/27/15.
+ */
+public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, Text, Text> {
+ protected static final Logger logger = LoggerFactory.getLogger(HiveToBaseCuboidMapper.class);
+ public static final byte[] HIVE_NULL = Bytes.toBytes("\\N");
+ public static final byte[] ONE = Bytes.toBytes("1");
+ protected String cubeName;
+ protected String segmentName;
+ protected Cuboid baseCuboid;
+ protected CubeInstance cube;
+ protected CubeDesc cubeDesc;
+ protected CubeSegment cubeSegment;
+ protected List<byte[]> nullBytes;
+ protected CubeJoinedFlatTableDesc intermediateTableDesc;
+ protected String intermediateTableRowDelimiter;
+ protected byte byteRowDelimiter;
+ protected int counter;
+ protected Object[] measures;
+ protected byte[][] keyBytesBuf;
+ protected BytesSplitter bytesSplitter;
+ protected AbstractRowKeyEncoder rowKeyEncoder;
+ protected MeasureCodec measureCodec;
+ private int errorRecordCounter;
+ private Text outputKey = new Text();
+ private Text outputValue = new Text();
+ private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.publishConfiguration(context.getConfiguration());
+
+ cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
+ segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
+ intermediateTableRowDelimiter = context.getConfiguration().get(BatchConstants.CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER, Character.toString(BatchConstants.INTERMEDIATE_TABLE_ROW_DELIMITER));
+ if (Bytes.toBytes(intermediateTableRowDelimiter).length > 1) {
+ throw new RuntimeException("Expected delimiter byte length is 1, but got " + Bytes.toBytes(intermediateTableRowDelimiter).length);
+ }
+
+ byteRowDelimiter = Bytes.toBytes(intermediateTableRowDelimiter)[0];
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ cubeDesc = cube.getDescriptor();
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+
+ long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+
+ bytesSplitter = new BytesSplitter(200, 4096);
+ rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid);
+
+ measureCodec = new MeasureCodec(cubeDesc.getMeasures());
+ measures = new Object[cubeDesc.getMeasures().size()];
+
+ int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
+ keyBytesBuf = new byte[colCount][];
+
+ initNullBytes();
+ }
+
+ private void initNullBytes() {
+ nullBytes = Lists.newArrayList();
+ nullBytes.add(HIVE_NULL);
+ String[] nullStrings = cubeDesc.getNullStrings();
+ if (nullStrings != null) {
+ for (String s : nullStrings) {
+ nullBytes.add(Bytes.toBytes(s));
+ }
+ }
+ }
+
+ private boolean isNull(byte[] v) {
+ for (byte[] nullByte : nullBytes) {
+ if (Bytes.equals(v, nullByte))
+ return true;
+ }
+ return false;
+ }
+
+ private byte[] buildKey(SplittedBytes[] splitBuffers) {
+ int[] rowKeyColumnIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
+ for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
+ int index = rowKeyColumnIndexes[i];
+ keyBytesBuf[i] = Arrays.copyOf(splitBuffers[index].value, splitBuffers[index].length);
+ if (isNull(keyBytesBuf[i])) {
+ keyBytesBuf[i] = null;
+ }
+ }
+ return rowKeyEncoder.encode(keyBytesBuf);
+ }
+
+ private void buildValue(SplittedBytes[] splitBuffers) {
+
+ for (int i = 0; i < measures.length; i++) {
+ byte[] valueBytes = getValueBytes(splitBuffers, i);
+ measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+ }
+
+ valueBuf.clear();
+ measureCodec.encode(measures, valueBuf);
+ }
+
+ private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
+ MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
+ FunctionDesc func = desc.getFunction();
+ ParameterDesc paramDesc = func.getParameter();
+ int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
+
+ byte[] result = null;
+
+ // constant
+ if (flatTableIdx == null) {
+ result = Bytes.toBytes(paramDesc.getValue());
+ }
+ // column values
+ else {
+ // for multiple columns, their values are joined
+ for (int i = 0; i < flatTableIdx.length; i++) {
+ SplittedBytes split = splitBuffers[flatTableIdx[i]];
+ if (result == null) {
+ result = Arrays.copyOf(split.value, split.length);
+ } else {
+ byte[] newResult = new byte[result.length + split.length];
+ System.arraycopy(result, 0, newResult, 0, result.length);
+ System.arraycopy(split.value, 0, newResult, result.length, split.length);
+ result = newResult;
+ }
+ }
+ }
+
+ if (func.isCount() || func.isHolisticCountDistinct()) {
+ // note for holistic count distinct, this value will be ignored
+ result = ONE;
+ }
+
+ if (isNull(result)) {
+ result = null;
+ }
+
+ return result;
+ }
+
+ protected void outputKV(Context context) throws IOException, InterruptedException {
+ intermediateTableDesc.sanityCheck(bytesSplitter);
+
+ byte[] rowKey = buildKey(bytesSplitter.getSplitBuffers());
+ outputKey.set(rowKey, 0, rowKey.length);
+
+ buildValue(bytesSplitter.getSplitBuffers());
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(outputKey, outputValue);
+ }
+
+ protected void handleErrorRecord(BytesSplitter bytesSplitter, Exception ex) throws IOException {
+
+ System.err.println("Insane record: " + bytesSplitter);
+ ex.printStackTrace(System.err);
+
+ errorRecordCounter++;
+ if (errorRecordCounter > BatchConstants.ERROR_RECORD_THRESHOLD) {
+ if (ex instanceof IOException)
+ throw (IOException) ex;
+ else if (ex instanceof RuntimeException)
+ throw (RuntimeException) ex;
+ else
+ throw new RuntimeException("", ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
new file mode 100644
index 0000000..599dde8
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.job.constant.BatchConstants;
+
+/**
+ * @author George Song (ysong1)
+ */
+public class HiveToBaseCuboidMapper<KEYIN> extends BaseCuboidMapperBase<KEYIN, Text> {
+
+ @Override
+ public void map(KEYIN key, Text value, Context context) throws IOException, InterruptedException {
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+
+ try {
+ //put a record into the shared bytesSplitter
+ bytesSplitter.split(value.getBytes(), value.getLength(), byteRowDelimiter);
+ //take care of the data in bytesSplitter
+ outputKV(context);
+
+ } catch (Exception ex) {
+ handleErrorRecord(bytesSplitter, ex);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
new file mode 100644
index 0000000..68886c0
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/IIToBaseCuboidMapper.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Queue;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.FIFOIterable;
+import org.apache.kylin.common.util.SplittedBytes;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.invertedindex.model.*;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+
+/**
+ * honma
+ */
+public class IIToBaseCuboidMapper extends BaseCuboidMapperBase<ImmutableBytesWritable, Result> {
+ private Queue<IIRow> buffer = Lists.newLinkedList();
+ private Iterator<Slice> slices;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.setup(context);
+
+ Configuration conf = context.getConfiguration();
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ String iiName = conf.get(BatchConstants.CFG_II_NAME);
+ IIInstance ii = IIManager.getInstance(config).getII(iiName);
+ IIDesc iiDesc = ii.getDescriptor();
+
+ TableRecordInfo info = new TableRecordInfo(iiDesc);
+ KeyValueCodec codec = new IIKeyValueCodecWithState(info.getDigest());
+ slices = codec.decodeKeyValue(new FIFOIterable<IIRow>(buffer)).iterator();
+ }
+
+ @Override
+ public void map(ImmutableBytesWritable key, Result cells, Context context) throws IOException, InterruptedException {
+ try {
+ IIRow iiRow = new IIRow();
+ for (Cell c : cells.rawCells()) {
+ iiRow.updateWith(c);
+ }
+ buffer.add(iiRow);
+
+ if (slices.hasNext()) {
+ Slice slice = slices.next();
+ TableRecordInfoDigest localDigest = slice.getInfo();
+ for (RawTableRecord record : slice) {
+
+ counter++;
+ if (counter % BatchConstants.COUNTER_MAX == 0) {
+ logger.info("Handled " + counter + " records!");
+ }
+
+ for (int indexInRecord = 0; indexInRecord < localDigest.getColumnCount(); ++indexInRecord) {
+ SplittedBytes columnBuffer = bytesSplitter.getSplitBuffer(indexInRecord);
+ if (!localDigest.isMetrics(indexInRecord)) {
+ String v = record.getValueMetric(indexInRecord);
+ byte[] metricBytes = v.getBytes();
+ System.arraycopy(metricBytes, 0, columnBuffer.value, 0, metricBytes.length);
+ columnBuffer.length = metricBytes.length;
+ } else {
+ Dictionary<?> dictionary = slice.getLocalDictionaries().get(indexInRecord);
+ Preconditions.checkArgument(columnBuffer.value.length > dictionary.getSizeOfValue(), "Column length too big");
+ int vid = record.getValueID(indexInRecord);
+ columnBuffer.length = dictionary.getValueBytesFromId(vid, columnBuffer.value, 0);
+ }
+ }
+
+ outputKV(context);
+ }
+ }
+ } catch (Exception ex) {
+ handleErrorRecord(bytesSplitter, ex);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index f83e9d7..7a7c62e 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -23,10 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.ToolRunner;
@@ -40,8 +37,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.exception.JobException;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidMapper;
-import org.apache.kylin.job.hadoop.cube.CuboidJob;
import org.apache.kylin.job.hadoop.cube.CuboidReducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
deleted file mode 100644
index 7826e86..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperPerformanceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.Mapper.Context;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-
-/**
- * @author yangli9
- *
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class BaseCuboidMapperPerformanceTest {
-
- String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
- String cubeName = "test_kylin_cube_with_slr";
- Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
-
- @Ignore("convenient trial tool for dev")
- @Test
- public void test() throws IOException, InterruptedException {
- Configuration hconf = new Configuration();
- BaseCuboidMapper mapper = new BaseCuboidMapper();
- Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
-
- mapper.setup(context);
-
- Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
- Text value = new Text();
-
- while (reader.next(key, value)) {
- mapper.map(key, value, context);
- }
-
- reader.close();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
deleted file mode 100644
index c3632b7..0000000
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/BaseCuboidMapperTest.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.hadoop.cube;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.math.BigDecimal;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mrunit.mapreduce.MapDriver;
-import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class BaseCuboidMapperTest extends LocalFileMetadataTestCase {
-
- MapDriver<Text, Text, Text, Text> mapDriver;
- String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
-
- @Before
- public void setUp() throws Exception {
- createTestMetadata();
-
- // hack for distributed cache
- FileUtils.deleteDirectory(new File("../job/meta"));
- FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
-
- BaseCuboidMapper<Text> mapper = new BaseCuboidMapper<Text>();
- mapDriver = MapDriver.newMapDriver(mapper);
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- FileUtils.deleteDirectory(new File("../job/meta"));
- }
-
- @Test
- public void testMapperWithHeader() throws Exception {
- String cubeName = "test_kylin_cube_with_slr_1_new_segment";
- String segmentName = "20130331080000_20131212080000";
- mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
- // metadata);
- mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrancesWomenAuction15123456789132.33"));
- List<Pair<Text, Text>> result = mapDriver.run();
-
- CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
- CubeInstance cube = cubeMgr.getCube(cubeName);
-
- assertEquals(1, result.size());
- Text rowkey = result.get(0).getFirst();
- byte[] key = rowkey.getBytes();
- byte[] header = Bytes.head(key, 26);
- byte[] sellerId = Bytes.tail(header, 18);
- byte[] cuboidId = Bytes.head(header, 8);
- byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
-
- RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
- decoder.decode(key);
- assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, Women, Auction, 0, 15]", decoder.getValues().toString());
-
- assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
- assertEquals(511, Bytes.toLong(cuboidId));
- assertEquals(22, restKey.length);
-
- verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "132.33", "132.33", "132.33");
- }
-
- private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String m1, String m2, String m3) {
- MeasureCodec codec = new MeasureCodec(measures);
- Object[] values = new Object[measures.size()];
- codec.decode(valueBytes, values);
- assertTrue(new BigDecimal(m1).equals(values[0]));
- assertTrue(new BigDecimal(m2).equals(values[1]));
- assertTrue(new BigDecimal(m3).equals(values[2]));
- }
-
- @Test
- public void testMapperWithNull() throws Exception {
- String cubeName = "test_kylin_cube_with_slr_1_new_segment";
- String segmentName = "20130331080000_20131212080000";
- mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
- mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
- // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
- // metadata);
- mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrances\\NAuction15123456789\\N"));
- List<Pair<Text, Text>> result = mapDriver.run();
-
- CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
- CubeInstance cube = cubeMgr.getCube(cubeName);
-
- assertEquals(1, result.size());
- Text rowkey = result.get(0).getFirst();
- byte[] key = rowkey.getBytes();
- byte[] header = Bytes.head(key, 26);
- byte[] sellerId = Bytes.tail(header, 18);
- byte[] cuboidId = Bytes.head(header, 8);
- byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
-
- RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
- decoder.decode(key);
- assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, null, Auction, 0, 15]", decoder.getValues().toString());
-
- assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
- assertEquals(511, Bytes.toLong(cuboidId));
- assertEquals(22, restKey.length);
-
- verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
new file mode 100644
index 0000000..cf9cfe0
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperPerformanceTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.Reader;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Mapper.Context;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ *
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class HiveToBaseCuboidMapperPerformanceTest {
+
+ String metadataUrl = "hbase:yadesk00:2181:/hbase-unsecure";
+ String cubeName = "test_kylin_cube_with_slr";
+ Path srcPath = new Path("/download/test_kylin_cube_with_slr_intermediate_table_64mb.seq");
+
+ @Ignore("convenient trial tool for dev")
+ @Test
+ public void test() throws IOException, InterruptedException {
+ Configuration hconf = new Configuration();
+ HiveToBaseCuboidMapper mapper = new HiveToBaseCuboidMapper();
+ Context context = MockupMapContext.create(hconf, metadataUrl, cubeName, null);
+
+ mapper.setup(context);
+
+ Reader reader = new Reader(hconf, SequenceFile.Reader.file(srcPath));
+ Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), hconf);
+ Text value = new Text();
+
+ while (reader.next(key, value)) {
+ mapper.map(key, value, context);
+ }
+
+ reader.close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/929b986d/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
new file mode 100644
index 0000000..f906fcb
--- /dev/null
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/HiveToBaseCuboidMapperTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.hadoop.cube;
+
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowKeyDecoder;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author George Song (ysong1)
+ *
+ */
+public class HiveToBaseCuboidMapperTest extends LocalFileMetadataTestCase {
+
+ MapDriver<Text, Text, Text, Text> mapDriver;
+ String localTempDir = System.getProperty("java.io.tmpdir") + File.separator;
+
+ @Before
+ public void setUp() throws Exception {
+ createTestMetadata();
+
+ // hack for distributed cache
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl()), new File("../job/meta"));
+
+ HiveToBaseCuboidMapper<Text> mapper = new HiveToBaseCuboidMapper<Text>();
+ mapDriver = MapDriver.newMapDriver(mapper);
+ }
+
+ @After
+ public void after() throws Exception {
+ cleanupTestMetadata();
+ FileUtils.deleteDirectory(new File("../job/meta"));
+ }
+
+ @Test
+ public void testMapperWithHeader() throws Exception {
+ String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+ String segmentName = "20130331080000_20131212080000";
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
+ // metadata);
+ mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrancesWomenAuction15123456789132.33"));
+ List<Pair<Text, Text>> result = mapDriver.run();
+
+ CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ assertEquals(1, result.size());
+ Text rowkey = result.get(0).getFirst();
+ byte[] key = rowkey.getBytes();
+ byte[] header = Bytes.head(key, 26);
+ byte[] sellerId = Bytes.tail(header, 18);
+ byte[] cuboidId = Bytes.head(header, 8);
+ byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
+
+ RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
+ decoder.decode(key);
+ assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, Women, Auction, 0, 15]", decoder.getValues().toString());
+
+ assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
+ assertEquals(511, Bytes.toLong(cuboidId));
+ assertEquals(22, restKey.length);
+
+ verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "132.33", "132.33", "132.33");
+ }
+
+ private void verifyMeasures(List<MeasureDesc> measures, Text valueBytes, String m1, String m2, String m3) {
+ MeasureCodec codec = new MeasureCodec(measures);
+ Object[] values = new Object[measures.size()];
+ codec.decode(valueBytes, values);
+ assertTrue(new BigDecimal(m1).equals(values[0]));
+ assertTrue(new BigDecimal(m2).equals(values[1]));
+ assertTrue(new BigDecimal(m3).equals(values[2]));
+ }
+
+ @Test
+ public void testMapperWithNull() throws Exception {
+ String cubeName = "test_kylin_cube_with_slr_1_new_segment";
+ String segmentName = "20130331080000_20131212080000";
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ mapDriver.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
+ // mapDriver.getConfiguration().set(BatchConstants.CFG_METADATA_URL,
+ // metadata);
+ mapDriver.withInput(new Text("key"), new Text("2012-12-15118480Health & BeautyFragrances\\NAuction15123456789\\N"));
+ List<Pair<Text, Text>> result = mapDriver.run();
+
+ CubeManager cubeMgr = CubeManager.getInstance(getTestConfig());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+
+ assertEquals(1, result.size());
+ Text rowkey = result.get(0).getFirst();
+ byte[] key = rowkey.getBytes();
+ byte[] header = Bytes.head(key, 26);
+ byte[] sellerId = Bytes.tail(header, 18);
+ byte[] cuboidId = Bytes.head(header, 8);
+ byte[] restKey = Bytes.tail(key, rowkey.getLength() - 26);
+
+ RowKeyDecoder decoder = new RowKeyDecoder(cube.getFirstSegment());
+ decoder.decode(key);
+ assertEquals("[123456789, 2012-12-15, 11848, Health & Beauty, Fragrances, null, Auction, 0, 15]", decoder.getValues().toString());
+
+ assertTrue(Bytes.toString(sellerId).startsWith("123456789"));
+ assertEquals(511, Bytes.toLong(cuboidId));
+ assertEquals(22, restKey.length);
+
+ verifyMeasures(cube.getDescriptor().getMeasures(), result.get(0).getSecond(), "0", "0", "0");
+ }
+}