You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/05/22 14:30:41 UTC

[kylin] 02/02: KYLIN-4508 Add more unit tests for MR module

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e220b9a20c4eae09c8c240c066b730595eb03693
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Fri May 22 10:28:03 2020 +0800

    KYLIN-4508 Add more unit tests for MR module
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 engine-mr/pom.xml                                  |   7 +
 .../kylin/engine/mr/common/JobInfoConverter.java   |  20 ++-
 .../kylin/engine/mr/ByteArrayWritableTest.java     |  91 ++++++++++
 .../engine/mr/common/CubeStatsReaderTest.java      |  45 +++++
 .../engine/mr/common/CubeStatsWriterTest.java      | 105 +++++++++++
 .../engine/mr/common/JobInfoConverterTest.java     |  77 +++++++-
 .../CalculateStatsFromBaseCuboidMapperTest.java    | 131 ++++++++++++++
 .../mr/steps/FactDistinctColumnsReducerTest.java   | 185 ++++++++++++++++++-
 .../engine/mr/steps/InMemCuboidMapperTest.java     | 197 +++++++++++++++++++++
 .../engine/mr/streaming/RowRecordReaderTest.java   | 150 ++++++++++++++++
 .../198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq       | Bin 0 -> 1253 bytes
 kylin-it/pom.xml                                   |   6 +
 .../mr/steps/FactDistinctColumnsMapperTest.java    |  98 ++++++++++
 pom.xml                                            |   2 +-
 14 files changed, 1102 insertions(+), 12 deletions(-)

diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 7a0bc4a..fffd098 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -119,6 +119,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <!-- power mock should be compatiable with mrunit-->
+            <version>1.5.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-stream-core</artifactId>
             <type>test-jar</type>
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 9c19065..ef16399 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -61,6 +61,7 @@ public class JobInfoConverter {
         }
     }
 
+
     public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) {
         if (job == null) {
             logger.warn("job is null.");
@@ -73,15 +74,24 @@ public class JobInfoConverter {
             return null;
         }
 
-        CubingJob cubeJob = (CubingJob) job;
-        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        CubingJob cubeJob = job;
+        String cubeName = CubingExecutableUtil.getCubeName(cubeJob.getParams());
+        String displayCubeName = cubeName;
+        try {
+            CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            if (cube != null) {
+                cubeName = cube.getName();
+                displayCubeName = cube.getDisplayName();
+            }
+        } catch (Exception e) {
+            logger.warn("Fail to get cube instance for {}.", cubeName);
+        }
 
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
         result.setProjectName(cubeJob.getProjectName());
-        result.setRelatedCube(cube != null ? cube.getName() : CubingExecutableUtil.getCubeName(cubeJob.getParams()));
-        result.setDisplayCubeName(cube != null ? cube.getDisplayName() : CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        result.setRelatedCube(cubeName);
+        result.setDisplayCubeName(displayCubeName);
         result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
         result.setRelatedSegmentName(CubingExecutableUtil.getSegmentName(cubeJob.getParams()));
         result.setLastModified(output.getLastModified());
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java
new file mode 100644
index 0000000..2630b36
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ByteArrayWritableTest {
+
+    @Test
+    public void basicTest() {
+        ByteArrayWritable byteArrayWritable = new ByteArrayWritable(5);
+        Assert.assertEquals(5, byteArrayWritable.length());
+        Assert.assertNotEquals(0, byteArrayWritable.hashCode());
+        byteArrayWritable.set(new byte[] { 0x1, 0x2, 0x3 });
+
+        Assert.assertArrayEquals(new byte[] { 1, 2, 3 }, byteArrayWritable.array());
+        Assert.assertEquals(0, byteArrayWritable.offset());
+        Assert.assertEquals(3, byteArrayWritable.length());
+
+        Assert.assertEquals("01 02 03", byteArrayWritable.toString());
+
+        ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null);
+        Assert.assertEquals(0, byteArrayWritableNull.hashCode());
+    }
+
+    @Test
+    public void testCompare() {
+        ByteArrayWritable b1 = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 });
+        ByteArrayWritable b2 = new ByteArrayWritable();
+
+        Assert.assertFalse(b1.equals(1));
+
+        b2.set(new byte[] { 0x1, 0x2, 0x3 });
+
+        Assert.assertTrue(b1.equals(b2));
+        Assert.assertTrue(b1.equals(new byte[] { 1, 2, 3 }));
+    }
+
+    @Test()
+    public void testIO() throws IOException {
+        ByteArrayWritable byteArrayWritable = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 });
+        ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null);
+        ByteArrayWritable byteArrayWritableSlice = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 }, 1, 2);
+
+        byteArrayWritable.asBuffer();
+        byteArrayWritableNull.asBuffer();
+        byteArrayWritableSlice.asBuffer();
+
+        OutputStream outputStream = new OutputStream() {
+            @Override
+            public void write(int b) throws IOException {
+            }
+        };
+        DataOutput output = new DataOutputStream(outputStream);
+        byteArrayWritable.write(output);
+
+        InputStream inputStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return 0;
+            }
+        };
+        DataInput input = new DataInputStream(inputStream);
+        byteArrayWritableNull.readFields(input);
+    }
+}
\ No newline at end of file
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java
new file mode 100644
index 0000000..9eaafab
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CubeStatsReaderTest extends LocalFileMetadataTestCase {
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testReadAndPrint() throws IOException {
+        String[] args = new String[] { "test_kylin_cube_with_slr_1_new_segment" };
+        CubeStatsReader.main(args);
+    }
+
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java
new file mode 100644
index 0000000..4e0f100
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CubeStatsWriterTest extends LocalFileMetadataTestCase {
+    private CubeInstance cube;
+    private CubeSegment cubeSegment;
+
+    private final String segmentId = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
+
+    @Before
+    public void setup() throws Exception {
+        File tmpFolder = getTempFolder();
+        FileUtils.deleteDirectory(tmpFolder);
+        tmpFolder.mkdirs();
+        createTestMetadata();
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getCube("test_kylin_cube_with_slr_1_new_segment");
+        cubeSegment = cube.getSegmentById(segmentId);
+    }
+
+    @After
+    public void after() throws Exception {
+        File tmpFolder = getTempFolder();
+        FileUtils.deleteDirectory(tmpFolder);
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testWrite() throws IOException {
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        conf.set("fs.defaultFS", "file:///");
+        conf.set("mapreduce.framework.name", "local");
+        conf.set("mapreduce.application.framework.path", "");
+        conf.set("fs.file.impl.disable.cache", "true");
+
+        final Path outputPath = new Path(getTmpFolderPath(), segmentId);
+
+        System.out.println(outputPath);
+        Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+        Set<Long> allCuboids = cube.getDescriptor().getAllCuboids();
+        for (Long cuboid : allCuboids) {
+            cuboidHLLMap.put(cuboid, createMockHLLCounter());
+        }
+        CubeStatsWriter.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+        assertTrue(new File(outputPath.toString(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME).exists());
+    }
+
+    private HLLCounter createMockHLLCounter() {
+        HLLCounter one = new HLLCounter(14);
+        for (int i = 0; i < 100; i++) {
+            one.clear();
+            one.add(i);
+        }
+        return one;
+    }
+
+    private File getTempFolder() {
+        return new File(getTmpFolderPath());
+    }
+
+    private String getTmpFolderPath() {
+        return "_tmp_cube_statistics";
+    }
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
index ffc223d..a54e112 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
@@ -25,15 +25,19 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.job.execution.DefaultOutput;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
@@ -59,6 +63,67 @@ public class JobInfoConverterTest {
         assertEquals(step.getStatus(), JobStepStatusEnum.FINISHED);
     }
 
+    @Test
+    public void testParseToJobInstance4CuboidJob() {
+        TestJob task = new TestJob();
+        String jobId = UUID.randomUUID().toString();
+        String cubeName = "cube1";
+        task.setId(jobId);
+        task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName);
+        Map<String, Output> outPutMap = Maps.newHashMap();
+        DefaultOutput executeOutput = new DefaultOutput();
+        executeOutput.setState(ExecutableState.READY);
+        Map<String, String> extraMap = Maps.newHashMap();
+        executeOutput.setExtra(extraMap);
+        outPutMap.put(jobId, executeOutput);
+
+        JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap);
+        // no exception thrown is expected
+        assertEquals(jobId, instance3.getId());
+        assertEquals(CubeBuildTypeEnum.BUILD, instance3.getType());
+        assertEquals(cubeName, instance3.getRelatedCube());
+        assertEquals(JobStatusEnum.PENDING, instance3.getStatus());
+    }
+
+    @Test
+    public void testParseToJobInstance4CheckpointJob() {
+        Test2Job task = new Test2Job();
+        String jobId = UUID.randomUUID().toString();
+        String cubeName = "cube1";
+        task.setId(jobId);
+        task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName);
+        Map<String, Output> outPutMap = Maps.newHashMap();
+        DefaultOutput executeOutput = new DefaultOutput();
+        executeOutput.setState(ExecutableState.READY);
+        Map<String, String> extraMap = Maps.newHashMap();
+        executeOutput.setExtra(extraMap);
+        outPutMap.put(jobId, executeOutput);
+
+        JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap);
+        // no exception thrown is expected
+        assertEquals(jobId, instance3.getId());
+        assertEquals(CubeBuildTypeEnum.CHECKPOINT, instance3.getType());
+        assertEquals(cubeName, instance3.getRelatedCube());
+        assertEquals(JobStatusEnum.PENDING, instance3.getStatus());
+    }
+
+    @Test
+    public void testStatusConvert() {
+        assertEquals(JobStatusEnum.PENDING, JobInfoConverter.parseToJobStatus(ExecutableState.READY));
+        assertEquals(JobStatusEnum.RUNNING, JobInfoConverter.parseToJobStatus(ExecutableState.RUNNING));
+        assertEquals(JobStatusEnum.DISCARDED, JobInfoConverter.parseToJobStatus(ExecutableState.DISCARDED));
+        assertEquals(JobStatusEnum.ERROR, JobInfoConverter.parseToJobStatus(ExecutableState.ERROR));
+        assertEquals(JobStatusEnum.STOPPED, JobInfoConverter.parseToJobStatus(ExecutableState.STOPPED));
+        assertEquals(JobStatusEnum.FINISHED, JobInfoConverter.parseToJobStatus(ExecutableState.SUCCEED));
+
+        assertEquals(JobStepStatusEnum.PENDING, JobInfoConverter.parseToJobStepStatus(ExecutableState.READY));
+        assertEquals(JobStepStatusEnum.RUNNING, JobInfoConverter.parseToJobStepStatus(ExecutableState.RUNNING));
+        assertEquals(JobStepStatusEnum.DISCARDED, JobInfoConverter.parseToJobStepStatus(ExecutableState.DISCARDED));
+        assertEquals(JobStepStatusEnum.ERROR, JobInfoConverter.parseToJobStepStatus(ExecutableState.ERROR));
+        assertEquals(JobStepStatusEnum.STOPPED, JobInfoConverter.parseToJobStepStatus(ExecutableState.STOPPED));
+        assertEquals(JobStepStatusEnum.FINISHED, JobInfoConverter.parseToJobStepStatus(ExecutableState.SUCCEED));
+    }
+
     public static class TestJob extends CubingJob {
         public TestJob() {
             super();
@@ -70,6 +135,17 @@ public class JobInfoConverterTest {
         }
     }
 
+    public static class Test2Job extends CheckpointExecutable {
+        public Test2Job() {
+            super();
+        }
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
+        }
+    }
+
     public static class TestOutput implements Output {
 
         @Override
@@ -229,5 +305,4 @@ public class JobInfoConverterTest {
 
         assertNull(jobInstance);
     }
-
 }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java
new file mode 100644
index 0000000..a1416a8
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Bytes;
+
+public class CalculateStatsFromBaseCuboidMapperTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private MapDriver<Text, Text, Text, Text> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        CalculateStatsFromBaseCuboidMapper calStatsFromBasicCuboidMapper = new CalculateStatsFromBaseCuboidMapper();
+        mapDriver = MapDriver.newMapDriver(calStatsFromBasicCuboidMapper);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMapper() throws Exception {
+        mapperTest();
+    }
+
+    @Test
+    public void testMapperWithOldHLL() throws Exception {
+        cubeDesc.setVersion("1.5.2");
+        mapperTest();
+    }
+
+    private void mapperTest() throws Exception {
+        setConfiguration();
+        Text key1 = new Text();
+        byte[] shard = new byte[] { 0, 0 };
+        byte[] cuboidId = Cuboid.getBaseCuboid(cubeDesc).getBytes();
+        byte[] col1 = new byte[] { 0, 0, 0, 1 };
+        byte[] col2 = new byte[] { 0, 6, 0 };
+        byte[] col3 = new byte[] { 1 };
+        byte[] col4 = new byte[] { 1 };
+        byte[] col5 = new byte[] { 1 };
+        byte[] col6 = new byte[] { 1 };
+        byte[] col7 = new byte[] { 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0 };
+        byte[] col8 = new byte[] { 1, 0 };
+        byte[] col9 = new byte[] { 1 };
+        key1.set(Bytes.concat(shard, cuboidId, col1, col2, col3, col4, col5, col6, col7, col8, col9));
+        Text val1 = new Text();
+
+        mapDriver.setInput(key1, val1);
+        List<Pair<Text, Text>> result = mapDriver.run();
+        Set<Long> cuboidIdSet = cube.getCuboidsByMode(CuboidModeEnum.CURRENT);
+        assertEquals(cuboidIdSet.size(), result.size());
+        Long[] cuboids = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        Arrays.sort(cuboids);
+
+        List<Long> resultCuboidList = Lists.transform(result, new Function<Pair<Text, Text>, Long>() {
+            @Nullable
+            @Override
+            public Long apply(@Nullable Pair<Text, Text> input) {
+                byte[] bytes = input.getFirst().getBytes();
+                return org.apache.kylin.common.util.Bytes.toLong(bytes);
+            }
+        });
+        Long[] resultCuboids = resultCuboidList.toArray(new Long[resultCuboidList.size()]);
+        Arrays.sort(resultCuboids);
+        assertArrayEquals(cuboids, resultCuboids);
+    }
+
+    private void setConfiguration() throws Exception {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+    }
+
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index 7348ce6..9f684c9 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -14,35 +14,92 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.engine.mr.steps;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
+
+public class FactDistinctColumnsReducerTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private ReduceDriver<SelfDefineSortableKey, Text, NullWritable, Text> reduceDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        FactDistinctColumnsReducer factDistinctColumnsReducer = new FactDistinctColumnsReducer();
+        reduceDriver = ReduceDriver.newReduceDriver(factDistinctColumnsReducer);
+    }
 
-/**
- */
-public class FactDistinctColumnsReducerTest {
+    @After
+    public void after() throws Exception {
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.deleteQuietly(new File(FileOutputCommitter.PENDING_DIR_NAME));
+        cleanupTestMetadata();
+    }
 
     @Test
     public void testWriteCuboidStatistics() throws IOException {
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();
         File tmp = File.createTempFile("cuboidstatistics", "");
-        final Path outputPath = new Path(tmp.getParent().toString() + File.separator + RandomUtil.randomUUID().toString());
+        final Path outputPath = new Path(tmp.getParent() + File.separator + RandomUtil.randomUUID().toString());
         if (!FileSystem.getLocal(conf).exists(outputPath)) {
             //            FileSystem.getLocal(conf).create(outputPath);
         }
@@ -53,4 +110,122 @@ public class FactDistinctColumnsReducerTest {
         FileSystem.getLocal(conf).delete(outputPath, true);
 
     }
+
+    @Test
+    public void testReducerStatistics() throws IOException {
+        setConfigurations();
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_STATISTICS, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        // override the task id
+        int dimColsSize = cubeDesc.getRowkey().getRowKeyColumns().length;
+        int uhcSize = cubeDesc.getAllUHCColumns().size();
+        final int targetTaskId = (dimColsSize - uhcSize) + uhcSize * cubeDesc.getConfig().getUHCReducerCount();
+
+        setContextTaskId(targetTaskId);
+        ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+        tmpBuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+        tmpBuf.putLong(100);
+        Text outputKey1 = new Text();
+        outputKey1.set(tmpBuf.array(), 0, tmpBuf.position());
+        SelfDefineSortableKey key1 = new SelfDefineSortableKey();
+        key1.init(outputKey1, (byte) 0);
+
+        HLLCounter hll = createMockHLLCounter();
+        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        hllBuf.clear();
+        hll.writeRegisters(hllBuf);
+        Text value1 = new Text();
+        value1.set(hllBuf.array(), 0, hllBuf.position());
+
+        reduceDriver.setInput(key1, ImmutableList.of(value1));
+
+        List<Pair<NullWritable, Text>> result = reduceDriver.run();
+        assertEquals(0, result.size()); // the reducer output statistics info to a sequence file.
+    }
+
+    @Test
+    public void testReducerNormalDimDictInReducer() throws IOException {
+        testNormalDim();
+    }
+
+    @Test
+    public void testReducerNormalDim() throws IOException {
+        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        kylinConfig.setProperty("kylin.engine.mr.build-dict-in-reducer", "false");
+        testNormalDim();
+    }
+
+    private void setContextTaskId(final int taskId) {
+        Context context = reduceDriver.getContext();
+        when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
+            @Override
+            public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
+                return TaskAttemptID.forName("attempt__0000_r_" + taskId + "_0");
+            }
+        });
+    }
+
+    private void setConfigurations() {
+        Configuration configuration = reduceDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment");
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(FileOutputFormat.OUTDIR, ".");
+    }
+
+    // copy from MultpleOutputs for test
+    private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
+    private static final String MO_PREFIX = "mapreduce.multipleoutputs.namedOutput.";
+    private static final String FORMAT = ".format";
+    private static final String KEY = ".key";
+    private static final String VALUE = ".value";
+
+    private void setMultipleOutputs(String namedOutput, Configuration conf,
+            Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass)
+            throws IOException {
+        conf.set(MULTIPLE_OUTPUTS, conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+        conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, OutputFormat.class);
+        conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+        conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+    }
+
+    private HLLCounter createMockHLLCounter() {
+        HLLCounter hllc = new HLLCounter(14);
+        HLLCounter one = new HLLCounter(14);
+        for (int i = 0; i < 1000; i++) {
+            one.clear();
+            one.add(i);
+            hllc.merge(one);
+        }
+        return hllc;
+    }
+
+    private void testNormalDim() throws IOException {
+        setConfigurations();
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_COLUMN, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_DICT, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        int nDimReducers = cubeDesc.getRowkey().getRowKeyColumns().length;
+        setContextTaskId(nDimReducers - 1);
+
+        ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+        String val = "100";
+        tmpBuf.put(Bytes.toBytes(val));
+        Text outputKey1 = new Text();
+        outputKey1.set(tmpBuf.array(), 0, tmpBuf.position());
+        SelfDefineSortableKey key1 = new SelfDefineSortableKey();
+        key1.init(outputKey1, (byte) 0);
+
+        reduceDriver.setInput(key1, ImmutableList.of(new Text()));
+        List<Pair<NullWritable, Text>> result = reduceDriver.run();
+        assertEquals(0, result.size());
+    }
+
 }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java
new file mode 100644
index 0000000..612e9ae
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+import org.powermock.reflect.Whitebox;
+
+@PrepareForTest({ MRUtil.class, CuboidSchedulerUtil.class, InMemCuboidMapper.class })
+public class InMemCuboidMapperTest extends LocalFileMetadataTestCase {
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private String cubeName;
+    private CubeInstance cube;
+    private InMemCuboidMapper<NullWritable> inMemCuboidMapper;
+    private MapDriver<NullWritable, Object, ByteArrayWritable, ByteArrayWritable> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        inMemCuboidMapper = new InMemCuboidMapper<>();
+        mapDriver = MapDriver.newMapDriver(inMemCuboidMapper);
+
+        PowerMockito.stub(PowerMockito.method(CuboidSchedulerUtil.class, "getCuboidSchedulerByMode", CubeSegment.class,
+                String.class)).toReturn(cube.getCuboidScheduler());
+        IMRBatchCubingInputSide mockInputSide = createMockInputSide();
+        PowerMockito.stub(PowerMockito.method(MRUtil.class, "getBatchCubingInputSide")).toReturn(mockInputSide);
+
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+    }
+
+    @Test
+    public void testMapper() throws Exception {
+        TestHandler testHandler = new TestHandler();
+        setConfigurationAndMock(testHandler);
+
+        mapDriver.setInput(NullWritable.get(), NullWritable.get());
+        mapDriver.run();
+    }
+
+    @Test
+    public void testMapperWithCutRow() throws Exception {
+        Whitebox.setInternalState(inMemCuboidMapper, "splitRowThreshold", 1);
+        Whitebox.setInternalState(inMemCuboidMapper, "unitRows", 1);
+        TestHandler testHandler = new TestWithCutRowHandler();
+        setConfigurationAndMock(testHandler);
+
+        mapDriver.setInput(NullWritable.get(), NullWritable.get());
+        mapDriver.run();
+    }
+
+    private void setConfigurationAndMock(TestHandler testHandler) throws Exception {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+
+        DoggedCubeBuilder mockDoggedCubeBuilder = createMockDoggedCubeBuilder(testHandler);
+        PowerMockito.whenNew(DoggedCubeBuilder.class).withAnyArguments().thenReturn(mockDoggedCubeBuilder);
+    }
+
+    private IMRBatchCubingInputSide createMockInputSide() throws Exception {
+        IMRInput.IMRTableInputFormat mockInputFormat = createMockInputFormat();
+        IMRBatchCubingInputSide mockInputSide = PowerMockito.mock(IMRBatchCubingInputSide.class);
+        PowerMockito.when(mockInputSide.getFlatTableInputFormat()).thenReturn(mockInputFormat);
+        return mockInputSide;
+    }
+
+    private IMRTableInputFormat createMockInputFormat() throws Exception {
+        String[] row = getMockInputRow();
+        Collection<String[]> rows = new ArrayList<>();
+        rows.add(row);
+        IMRTableInputFormat mockFormat = PowerMockito.mock(IMRTableInputFormat.class);
+        PowerMockito.when(mockFormat, "parseMapperInput", Mockito.any()).thenReturn(rows);
+        return mockFormat;
+    }
+
+    private static String[] getMockInputRow() {
+        return new String[] { "2012-01-01" };
+    }
+
+    private DoggedCubeBuilder createMockDoggedCubeBuilder(final TestHandler hanlder) throws Exception {
+        DoggedCubeBuilder mockDoggedCubeBuilder = PowerMockito.mock(DoggedCubeBuilder.class);
+        PowerMockito.when(mockDoggedCubeBuilder, "buildAsRunnable", Mockito.any(BlockingQueue.class), Mockito.any(),
+                Mockito.any()).thenAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        hanlder.setInputQueue((BlockingQueue) args[0]);
+                        return hanlder;
+                    }
+                });
+        return mockDoggedCubeBuilder;
+    }
+
+    private static class TestHandler implements Runnable {
+        BlockingQueue queue;
+
+        public void setInputQueue(BlockingQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                consumeRows();
+
+                while (true) {
+                    String[] newRow = (String[]) queue.take();
+                    if (InputConverterUnitForRawData.END_ROW == newRow) {
+                        break;
+                    }
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        protected void consumeRows() throws InterruptedException {
+            String[] row = (String[]) queue.take();
+            assertArrayEquals(getMockInputRow(), row);
+        }
+
+    }
+
+    private static class TestWithCutRowHandler extends TestHandler {
+
+        protected void consumeRows() throws InterruptedException {
+            String[] row = (String[]) queue.take();
+            assertArrayEquals(getMockInputRow(), row);
+
+            row = (String[]) queue.take();
+            assertArrayEquals(InputConverterUnitForRawData.CUT_ROW, row);
+        }
+
+    }
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java
new file mode 100644
index 0000000..acc5f6d
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.streaming;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.stream.core.model.StreamingMessage;
+import org.apache.kylin.stream.core.query.StreamingQueryProfile;
+import org.apache.kylin.stream.core.storage.columnar.ColumnarMemoryStorePersister;
+import org.apache.kylin.stream.core.storage.columnar.DataSegmentFragment;
+import org.apache.kylin.stream.core.storage.columnar.FragmentData;
+import org.apache.kylin.stream.core.storage.columnar.FragmentFileSearcher;
+import org.apache.kylin.stream.core.storage.columnar.FragmentId;
+import org.apache.kylin.stream.core.storage.columnar.ParsedStreamingCubeInfo;
+import org.apache.kylin.stream.core.storage.columnar.SegmentMemoryStore;
+import org.apache.kylin.stream.core.storage.columnar.StreamingDataSimulator;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class RowRecordReaderTest extends LocalFileMetadataTestCase {
+    private static final String cubeName = "test_streaming_v2_cube";
+
+    private String baseStorePath;
+    private CubeInstance cubeInstance;
+    private String segmentName;
+    private ParsedStreamingCubeInfo parsedStreamingCubeInfo;
+    private DataSegmentFragment fragment;
+    private FragmentFileSearcher fragmentFileSearcher;
+    private CubeDesc cubeDesc;
+    private int eventCnt = 50000;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        this.baseStorePath = KylinConfig.getInstanceFromEnv().getStreamingIndexPath();
+        this.cubeInstance = CubeManager.getInstance(getTestConfig()).reloadCube(cubeName);
+        this.cubeDesc = cubeInstance.getDescriptor();
+        this.segmentName = "20171018100000_20171018110000";
+        this.parsedStreamingCubeInfo = new ParsedStreamingCubeInfo(cubeInstance);
+        this.fragment = new DataSegmentFragment(baseStorePath, cubeName, segmentName, new FragmentId(0));
+        PropertyConfigurator.configure("../build/conf/kylin-tools-log4j.properties");
+        prepareData();
+        fragmentFileSearcher = new FragmentFileSearcher(fragment, new FragmentData(fragment.getMetaInfo(),
+                fragment.getDataFile()));
+        StreamingQueryProfile.set(new StreamingQueryProfile("test-query-id", System.currentTimeMillis()));
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        cleanupData();
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        Path path = new Path(fragment.getDataFile().getParentFile().getAbsolutePath());
+        FileSystem fs = FileSystem.getLocal(new Configuration());
+        RowRecordReader rowRecordReader = new RowRecordReader(cubeDesc, path, fs);
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
+        List<DataTypeSerializer> dataTypeSerializers = Lists.newArrayListWithCapacity(measures.size());
+        for (MeasureDesc measure : measures) {
+            dataTypeSerializers.add(DataTypeSerializer.create(measure.getFunction().getReturnDataType()));
+        }
+        int rowNum = 0;
+        while (rowRecordReader.hasNextRow()) {
+            RowRecord record = rowRecordReader.nextRow();
+            if (rowNum < 10) {
+                String[] dimensions = record.getDimensions();
+                byte[][] metrics = record.getMetrics();
+                Object[] metricValues = new Object[metrics.length];
+                for (int i = 0; i < metrics.length; i++) {
+                    metricValues[i] = dataTypeSerializers.get(i).deserialize(ByteBuffer.wrap(metrics[i]));
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < dimensions.length; i++) {
+                    sb.append(dimensions[i]);
+                    sb.append(",");
+                }
+                for (int i = 0; i < metricValues.length; i++) {
+                    sb.append(metricValues[i].toString());
+                    sb.append(",");
+                }
+                System.out.println(sb.toString());
+            }
+            rowNum++;
+        }
+        assertEquals(eventCnt, rowNum);
+    }
+
+    protected void prepareData() {
+        // build additional cuboids
+        KylinConfigExt configExt = (KylinConfigExt) cubeInstance.getDescriptor().getConfig();
+        configExt.getExtendedOverrides().put("kylin.stream.build.additional.cuboids", "true");
+
+        ColumnarMemoryStorePersister memStorePersister = new ColumnarMemoryStorePersister(parsedStreamingCubeInfo,
+                segmentName);
+        StreamingDataSimulator simulator = new StreamingDataSimulator(
+                StreamingDataSimulator.getDefaultCardinalityMap(), 100000);
+        Iterator<StreamingMessage> streamingMessages = simulator.simulate(eventCnt, System.currentTimeMillis());
+        SegmentMemoryStore memoryStore = new SegmentMemoryStore(new ParsedStreamingCubeInfo(cubeInstance), segmentName);
+        while (streamingMessages.hasNext()) {
+            memoryStore.index(streamingMessages.next());
+        }
+
+        memStorePersister.persist(memoryStore, fragment);
+    }
+
+    private void cleanupData() throws IOException {
+        FileUtils.deleteQuietly(new File(baseStorePath));
+    }
+
+}
diff --git a/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq
new file mode 100644
index 0000000..02bcf8c
Binary files /dev/null and b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq differ
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 99873fd..2b82720 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -176,6 +176,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-annotations</artifactId>
             <scope>provided</scope>
diff --git a/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java
new file mode 100644
index 0000000..8d9fd7e
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FactDistinctColumnsMapperTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private MapDriver<LongWritable, Object, SelfDefineSortableKey, Text> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        FactDistinctColumnsMapper<LongWritable> factDistinctColumnsMapper = new FactDistinctColumnsMapper<>();
+        mapDriver = MapDriver.newMapDriver(factDistinctColumnsMapper);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMapper() throws IOException {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment");
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+        HCatRecord value1 = new DefaultHCatRecord(11);
+        value1.set(0, "2012-08-16");
+        value1.set(1, "48027");
+        value1.set(2, "0");
+        value1.set(3, "Home & Garden");
+        value1.set(4, "Cheese & Crackers");
+        value1.set(5, "Cheese & Crackers");
+        value1.set(6, "48027");
+        value1.set(7, "16");
+        value1.set(8, "10000010");
+        value1.set(9, "204.28");
+        value1.set(10, "5");
+        mapDriver.addInput(new LongWritable(0), value1);
+
+        List<Pair<SelfDefineSortableKey, Text>> result = mapDriver.run();
+        int colsNeedDictSize = cubeDesc.getAllColumnsNeedDictionaryBuilt().size();
+        int cuboidsCnt = cubeDesc.getAllCuboids().size();
+
+        assertEquals(
+                colsNeedDictSize + (cubeDesc.getRowkey().getRowKeyColumns().length - colsNeedDictSize) * 2 + cuboidsCnt,
+                result.size());
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index b3e80b5..bd916b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@
     </sonar.jacoco.excludes>
 
     <!-- JVM Args for Testing -->
-    <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -Duser.timezone=UTC</argLine>
+    <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -noverify -Duser.timezone=UTC</argLine>
   </properties>
 
   <licenses>