You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2020/05/22 14:30:39 UTC

[kylin] branch master updated (b8d55d0 -> e220b9a)

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git.


    from b8d55d0  KYLIN-2971 Fix the wrong "Realization Names" and missing "Cuboid Ids" in logQuery when hit cache
     new 2d718c8  KYLIN-4508 Add more unit tests for core-metrics module
     new e220b9a  KYLIN-4508 Add more unit tests for MR module

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core-metrics/pom.xml                               |  17 ++
 .../kylin/metrics/lib/impl/BlockingReservoir.java  |  21 ++-
 .../apache/kylin/metrics/lib/impl/RecordEvent.java |   6 +-
 .../kylin/metrics/lib/impl/TimePropertyEnum.java   |   2 +-
 .../metrics/lib/impl/BlockingReservoirTest.java    |  52 ++++--
 .../metrics/lib/impl/InstantReservoirTest.java     |  44 ++---
 .../kylin/metrics/lib/impl/MetricsSystemTest.java  |  67 +++++++
 .../kylin/metrics/lib/impl/RecordEventTest.java    |  82 +++++++++
 .../metrics/property/MetricsPropertyEnumTest.java  | 132 ++++++++++++++
 engine-mr/pom.xml                                  |   7 +
 .../kylin/engine/mr/common/JobInfoConverter.java   |  20 ++-
 .../kylin/engine/mr/ByteArrayWritableTest.java     |  91 ++++++++++
 .../engine/mr/common/CubeStatsReaderTest.java      |  21 ++-
 .../engine/mr/common/CubeStatsWriterTest.java      | 105 +++++++++++
 .../engine/mr/common/JobInfoConverterTest.java     |  77 +++++++-
 .../CalculateStatsFromBaseCuboidMapperTest.java    | 131 ++++++++++++++
 .../mr/steps/FactDistinctColumnsReducerTest.java   | 185 ++++++++++++++++++-
 .../engine/mr/steps/InMemCuboidMapperTest.java     | 197 +++++++++++++++++++++
 .../engine/mr/streaming/RowRecordReaderTest.java   | 150 ++++++++++++++++
 .../198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq       | Bin 0 -> 1253 bytes
 kylin-it/pom.xml                                   |   6 +
 .../mr/steps/FactDistinctColumnsMapperTest.java    |  98 ++++++++++
 pom.xml                                            |   2 +-
 23 files changed, 1450 insertions(+), 63 deletions(-)
 copy core-cube/src/test/java/org/apache/kylin/cube/ProjectSpecificConfigTest.java => core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/BlockingReservoirTest.java (52%)
 copy core-cube/src/test/java/org/apache/kylin/cube/ProjectSpecificConfigTest.java => core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/InstantReservoirTest.java (55%)
 create mode 100644 core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/MetricsSystemTest.java
 create mode 100644 core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/RecordEventTest.java
 create mode 100644 core-metrics/src/test/java/org/apache/kylin/metrics/property/MetricsPropertyEnumTest.java
 create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java
 copy core-job/src/test/java/org/apache/kylin/job/BasicLocalMetaTest.java => engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java (70%)
 create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java
 create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java
 create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java
 create mode 100644 engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java
 create mode 100644 examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq
 create mode 100644 kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java


[kylin] 02/02: KYLIN-4508 Add more unit tests for MR module

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit e220b9a20c4eae09c8c240c066b730595eb03693
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Fri May 22 10:28:03 2020 +0800

    KYLIN-4508 Add more unit tests for MR module
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 engine-mr/pom.xml                                  |   7 +
 .../kylin/engine/mr/common/JobInfoConverter.java   |  20 ++-
 .../kylin/engine/mr/ByteArrayWritableTest.java     |  91 ++++++++++
 .../engine/mr/common/CubeStatsReaderTest.java      |  45 +++++
 .../engine/mr/common/CubeStatsWriterTest.java      | 105 +++++++++++
 .../engine/mr/common/JobInfoConverterTest.java     |  77 +++++++-
 .../CalculateStatsFromBaseCuboidMapperTest.java    | 131 ++++++++++++++
 .../mr/steps/FactDistinctColumnsReducerTest.java   | 185 ++++++++++++++++++-
 .../engine/mr/steps/InMemCuboidMapperTest.java     | 197 +++++++++++++++++++++
 .../engine/mr/streaming/RowRecordReaderTest.java   | 150 ++++++++++++++++
 .../198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq       | Bin 0 -> 1253 bytes
 kylin-it/pom.xml                                   |   6 +
 .../mr/steps/FactDistinctColumnsMapperTest.java    |  98 ++++++++++
 pom.xml                                            |   2 +-
 14 files changed, 1102 insertions(+), 12 deletions(-)

diff --git a/engine-mr/pom.xml b/engine-mr/pom.xml
index 7a0bc4a..fffd098 100644
--- a/engine-mr/pom.xml
+++ b/engine-mr/pom.xml
@@ -119,6 +119,13 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4-rule-agent</artifactId>
+            <!-- power mock should be compatiable with mrunit-->
+            <version>1.5.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-stream-core</artifactId>
             <type>test-jar</type>
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
index 9c19065..ef16399 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java
@@ -61,6 +61,7 @@ public class JobInfoConverter {
         }
     }
 
+
     public static JobInstance parseToJobInstance(CubingJob job, Map<String, Output> outputs) {
         if (job == null) {
             logger.warn("job is null.");
@@ -73,15 +74,24 @@ public class JobInfoConverter {
             return null;
         }
 
-        CubingJob cubeJob = (CubingJob) job;
-        CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
-                .getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        CubingJob cubeJob = job;
+        String cubeName = CubingExecutableUtil.getCubeName(cubeJob.getParams());
+        String displayCubeName = cubeName;
+        try {
+            CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            if (cube != null) {
+                cubeName = cube.getName();
+                displayCubeName = cube.getDisplayName();
+            }
+        } catch (Exception e) {
+            logger.warn("Fail to get cube instance for {}.", cubeName);
+        }
 
         final JobInstance result = new JobInstance();
         result.setName(job.getName());
         result.setProjectName(cubeJob.getProjectName());
-        result.setRelatedCube(cube != null ? cube.getName() : CubingExecutableUtil.getCubeName(cubeJob.getParams()));
-        result.setDisplayCubeName(cube != null ? cube.getDisplayName() : CubingExecutableUtil.getCubeName(cubeJob.getParams()));
+        result.setRelatedCube(cubeName);
+        result.setDisplayCubeName(displayCubeName);
         result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams()));
         result.setRelatedSegmentName(CubingExecutableUtil.getSegmentName(cubeJob.getParams()));
         result.setLastModified(output.getLastModified());
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java
new file mode 100644
index 0000000..2630b36
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/ByteArrayWritableTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+public class ByteArrayWritableTest {
+
+    @Test
+    public void basicTest() {
+        ByteArrayWritable byteArrayWritable = new ByteArrayWritable(5);
+        Assert.assertEquals(5, byteArrayWritable.length());
+        Assert.assertNotEquals(0, byteArrayWritable.hashCode());
+        byteArrayWritable.set(new byte[] { 0x1, 0x2, 0x3 });
+
+        Assert.assertArrayEquals(new byte[] { 1, 2, 3 }, byteArrayWritable.array());
+        Assert.assertEquals(0, byteArrayWritable.offset());
+        Assert.assertEquals(3, byteArrayWritable.length());
+
+        Assert.assertEquals("01 02 03", byteArrayWritable.toString());
+
+        ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null);
+        Assert.assertEquals(0, byteArrayWritableNull.hashCode());
+    }
+
+    @Test
+    public void testCompare() {
+        ByteArrayWritable b1 = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 });
+        ByteArrayWritable b2 = new ByteArrayWritable();
+
+        Assert.assertFalse(b1.equals(1));
+
+        b2.set(new byte[] { 0x1, 0x2, 0x3 });
+
+        Assert.assertTrue(b1.equals(b2));
+        Assert.assertTrue(b1.equals(new byte[] { 1, 2, 3 }));
+    }
+
+    @Test()
+    public void testIO() throws IOException {
+        ByteArrayWritable byteArrayWritable = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 });
+        ByteArrayWritable byteArrayWritableNull = new ByteArrayWritable(null);
+        ByteArrayWritable byteArrayWritableSlice = new ByteArrayWritable(new byte[] { 0x1, 0x2, 0x3 }, 1, 2);
+
+        byteArrayWritable.asBuffer();
+        byteArrayWritableNull.asBuffer();
+        byteArrayWritableSlice.asBuffer();
+
+        OutputStream outputStream = new OutputStream() {
+            @Override
+            public void write(int b) throws IOException {
+            }
+        };
+        DataOutput output = new DataOutputStream(outputStream);
+        byteArrayWritable.write(output);
+
+        InputStream inputStream = new InputStream() {
+            @Override
+            public int read() throws IOException {
+                return 0;
+            }
+        };
+        DataInput input = new DataInputStream(inputStream);
+        byteArrayWritableNull.readFields(input);
+    }
+}
\ No newline at end of file
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java
new file mode 100644
index 0000000..9eaafab
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsReaderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CubeStatsReaderTest extends LocalFileMetadataTestCase {
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testReadAndPrint() throws IOException {
+        String[] args = new String[] { "test_kylin_cube_with_slr_1_new_segment" };
+        CubeStatsReader.main(args);
+    }
+
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java
new file mode 100644
index 0000000..4e0f100
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/CubeStatsWriterTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.common;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class CubeStatsWriterTest extends LocalFileMetadataTestCase {
+    private CubeInstance cube;
+    private CubeSegment cubeSegment;
+
+    private final String segmentId = "198va32a-a33e-4b69-83dd-0bb8b1f8c53b";
+
+    @Before
+    public void setup() throws Exception {
+        File tmpFolder = getTempFolder();
+        FileUtils.deleteDirectory(tmpFolder);
+        tmpFolder.mkdirs();
+        createTestMetadata();
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getCube("test_kylin_cube_with_slr_1_new_segment");
+        cubeSegment = cube.getSegmentById(segmentId);
+    }
+
+    @After
+    public void after() throws Exception {
+        File tmpFolder = getTempFolder();
+        FileUtils.deleteDirectory(tmpFolder);
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testWrite() throws IOException {
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        conf.set("fs.defaultFS", "file:///");
+        conf.set("mapreduce.framework.name", "local");
+        conf.set("mapreduce.application.framework.path", "");
+        conf.set("fs.file.impl.disable.cache", "true");
+
+        final Path outputPath = new Path(getTmpFolderPath(), segmentId);
+
+        System.out.println(outputPath);
+        Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+
+        Set<Long> allCuboids = cube.getDescriptor().getAllCuboids();
+        for (Long cuboid : allCuboids) {
+            cuboidHLLMap.put(cuboid, createMockHLLCounter());
+        }
+        CubeStatsWriter.writeCuboidStatistics(conf, outputPath, cuboidHLLMap, 100);
+        assertTrue(new File(outputPath.toString(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME).exists());
+    }
+
+    private HLLCounter createMockHLLCounter() {
+        HLLCounter one = new HLLCounter(14);
+        for (int i = 0; i < 100; i++) {
+            one.clear();
+            one.add(i);
+        }
+        return one;
+    }
+
+    private File getTempFolder() {
+        return new File(getTmpFolderPath());
+    }
+
+    private String getTmpFolderPath() {
+        return "_tmp_cube_statistics";
+    }
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
index ffc223d..a54e112 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/common/JobInfoConverterTest.java
@@ -25,15 +25,19 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.CheckpointExecutable;
+import org.apache.kylin.job.execution.DefaultOutput;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.ExecuteResult;
@@ -59,6 +63,67 @@ public class JobInfoConverterTest {
         assertEquals(step.getStatus(), JobStepStatusEnum.FINISHED);
     }
 
+    @Test
+    public void testParseToJobInstance4CuboidJob() {
+        TestJob task = new TestJob();
+        String jobId = UUID.randomUUID().toString();
+        String cubeName = "cube1";
+        task.setId(jobId);
+        task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName);
+        Map<String, Output> outPutMap = Maps.newHashMap();
+        DefaultOutput executeOutput = new DefaultOutput();
+        executeOutput.setState(ExecutableState.READY);
+        Map<String, String> extraMap = Maps.newHashMap();
+        executeOutput.setExtra(extraMap);
+        outPutMap.put(jobId, executeOutput);
+
+        JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap);
+        // no exception thrown is expected
+        assertEquals(jobId, instance3.getId());
+        assertEquals(CubeBuildTypeEnum.BUILD, instance3.getType());
+        assertEquals(cubeName, instance3.getRelatedCube());
+        assertEquals(JobStatusEnum.PENDING, instance3.getStatus());
+    }
+
+    @Test
+    public void testParseToJobInstance4CheckpointJob() {
+        Test2Job task = new Test2Job();
+        String jobId = UUID.randomUUID().toString();
+        String cubeName = "cube1";
+        task.setId(jobId);
+        task.setParam(CubingExecutableUtil.CUBE_NAME, cubeName);
+        Map<String, Output> outPutMap = Maps.newHashMap();
+        DefaultOutput executeOutput = new DefaultOutput();
+        executeOutput.setState(ExecutableState.READY);
+        Map<String, String> extraMap = Maps.newHashMap();
+        executeOutput.setExtra(extraMap);
+        outPutMap.put(jobId, executeOutput);
+
+        JobInstance instance3 = JobInfoConverter.parseToJobInstanceQuietly(task, outPutMap);
+        // no exception thrown is expected
+        assertEquals(jobId, instance3.getId());
+        assertEquals(CubeBuildTypeEnum.CHECKPOINT, instance3.getType());
+        assertEquals(cubeName, instance3.getRelatedCube());
+        assertEquals(JobStatusEnum.PENDING, instance3.getStatus());
+    }
+
+    @Test
+    public void testStatusConvert() {
+        assertEquals(JobStatusEnum.PENDING, JobInfoConverter.parseToJobStatus(ExecutableState.READY));
+        assertEquals(JobStatusEnum.RUNNING, JobInfoConverter.parseToJobStatus(ExecutableState.RUNNING));
+        assertEquals(JobStatusEnum.DISCARDED, JobInfoConverter.parseToJobStatus(ExecutableState.DISCARDED));
+        assertEquals(JobStatusEnum.ERROR, JobInfoConverter.parseToJobStatus(ExecutableState.ERROR));
+        assertEquals(JobStatusEnum.STOPPED, JobInfoConverter.parseToJobStatus(ExecutableState.STOPPED));
+        assertEquals(JobStatusEnum.FINISHED, JobInfoConverter.parseToJobStatus(ExecutableState.SUCCEED));
+
+        assertEquals(JobStepStatusEnum.PENDING, JobInfoConverter.parseToJobStepStatus(ExecutableState.READY));
+        assertEquals(JobStepStatusEnum.RUNNING, JobInfoConverter.parseToJobStepStatus(ExecutableState.RUNNING));
+        assertEquals(JobStepStatusEnum.DISCARDED, JobInfoConverter.parseToJobStepStatus(ExecutableState.DISCARDED));
+        assertEquals(JobStepStatusEnum.ERROR, JobInfoConverter.parseToJobStepStatus(ExecutableState.ERROR));
+        assertEquals(JobStepStatusEnum.STOPPED, JobInfoConverter.parseToJobStepStatus(ExecutableState.STOPPED));
+        assertEquals(JobStepStatusEnum.FINISHED, JobInfoConverter.parseToJobStepStatus(ExecutableState.SUCCEED));
+    }
+
     public static class TestJob extends CubingJob {
         public TestJob() {
             super();
@@ -70,6 +135,17 @@ public class JobInfoConverterTest {
         }
     }
 
+    public static class Test2Job extends CheckpointExecutable {
+        public Test2Job() {
+            super();
+        }
+
+        @Override
+        protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "");
+        }
+    }
+
     public static class TestOutput implements Output {
 
         @Override
@@ -229,5 +305,4 @@ public class JobInfoConverterTest {
 
         assertNull(jobInstance);
     }
-
 }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java
new file mode 100644
index 0000000..a1416a8
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidMapperTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidModeEnum;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Bytes;
+
+public class CalculateStatsFromBaseCuboidMapperTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private MapDriver<Text, Text, Text, Text> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        CalculateStatsFromBaseCuboidMapper calStatsFromBasicCuboidMapper = new CalculateStatsFromBaseCuboidMapper();
+        mapDriver = MapDriver.newMapDriver(calStatsFromBasicCuboidMapper);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMapper() throws Exception {
+        mapperTest();
+    }
+
+    @Test
+    public void testMapperWithOldHLL() throws Exception {
+        cubeDesc.setVersion("1.5.2");
+        mapperTest();
+    }
+
+    private void mapperTest() throws Exception {
+        setConfiguration();
+        Text key1 = new Text();
+        byte[] shard = new byte[] { 0, 0 };
+        byte[] cuboidId = Cuboid.getBaseCuboid(cubeDesc).getBytes();
+        byte[] col1 = new byte[] { 0, 0, 0, 1 };
+        byte[] col2 = new byte[] { 0, 6, 0 };
+        byte[] col3 = new byte[] { 1 };
+        byte[] col4 = new byte[] { 1 };
+        byte[] col5 = new byte[] { 1 };
+        byte[] col6 = new byte[] { 1 };
+        byte[] col7 = new byte[] { 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0 };
+        byte[] col8 = new byte[] { 1, 0 };
+        byte[] col9 = new byte[] { 1 };
+        key1.set(Bytes.concat(shard, cuboidId, col1, col2, col3, col4, col5, col6, col7, col8, col9));
+        Text val1 = new Text();
+
+        mapDriver.setInput(key1, val1);
+        List<Pair<Text, Text>> result = mapDriver.run();
+        Set<Long> cuboidIdSet = cube.getCuboidsByMode(CuboidModeEnum.CURRENT);
+        assertEquals(cuboidIdSet.size(), result.size());
+        Long[] cuboids = cuboidIdSet.toArray(new Long[cuboidIdSet.size()]);
+        Arrays.sort(cuboids);
+
+        List<Long> resultCuboidList = Lists.transform(result, new Function<Pair<Text, Text>, Long>() {
+            @Nullable
+            @Override
+            public Long apply(@Nullable Pair<Text, Text> input) {
+                byte[] bytes = input.getFirst().getBytes();
+                return org.apache.kylin.common.util.Bytes.toLong(bytes);
+            }
+        });
+        Long[] resultCuboids = resultCuboidList.toArray(new Long[resultCuboidList.size()]);
+        Arrays.sort(resultCuboids);
+        assertArrayEquals(cuboids, resultCuboids);
+    }
+
+    private void setConfiguration() throws Exception {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+    }
+
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index 7348ce6..9f684c9 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -14,35 +14,92 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.engine.mr.steps;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayPrimitiveWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.Reducer.Context;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.common.util.RandomUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
+import org.apache.kylin.shaded.com.google.common.collect.ImmutableList;
+
+public class FactDistinctColumnsReducerTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private ReduceDriver<SelfDefineSortableKey, Text, NullWritable, Text> reduceDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        FactDistinctColumnsReducer factDistinctColumnsReducer = new FactDistinctColumnsReducer();
+        reduceDriver = ReduceDriver.newReduceDriver(factDistinctColumnsReducer);
+    }
 
-/**
- */
-public class FactDistinctColumnsReducerTest {
+    @After
+    public void after() throws Exception {
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.deleteQuietly(new File(FileOutputCommitter.PENDING_DIR_NAME));
+        cleanupTestMetadata();
+    }
 
     @Test
     public void testWriteCuboidStatistics() throws IOException {
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();
         File tmp = File.createTempFile("cuboidstatistics", "");
-        final Path outputPath = new Path(tmp.getParent().toString() + File.separator + RandomUtil.randomUUID().toString());
+        final Path outputPath = new Path(tmp.getParent() + File.separator + RandomUtil.randomUUID().toString());
         if (!FileSystem.getLocal(conf).exists(outputPath)) {
             //            FileSystem.getLocal(conf).create(outputPath);
         }
@@ -53,4 +110,122 @@ public class FactDistinctColumnsReducerTest {
         FileSystem.getLocal(conf).delete(outputPath, true);
 
     }
+
+    @Test
+    public void testReducerStatistics() throws IOException {
+        setConfigurations();
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_STATISTICS, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        // override the task id
+        int dimColsSize = cubeDesc.getRowkey().getRowKeyColumns().length;
+        int uhcSize = cubeDesc.getAllUHCColumns().size();
+        final int targetTaskId = (dimColsSize - uhcSize) + uhcSize * cubeDesc.getConfig().getUHCReducerCount();
+
+        setContextTaskId(targetTaskId);
+        ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+        tmpBuf.put((byte) FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER); // one byte
+        tmpBuf.putLong(100);
+        Text outputKey1 = new Text();
+        outputKey1.set(tmpBuf.array(), 0, tmpBuf.position());
+        SelfDefineSortableKey key1 = new SelfDefineSortableKey();
+        key1.init(outputKey1, (byte) 0);
+
+        HLLCounter hll = createMockHLLCounter();
+        ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+        hllBuf.clear();
+        hll.writeRegisters(hllBuf);
+        Text value1 = new Text();
+        value1.set(hllBuf.array(), 0, hllBuf.position());
+
+        reduceDriver.setInput(key1, ImmutableList.of(value1));
+
+        List<Pair<NullWritable, Text>> result = reduceDriver.run();
+        assertEquals(0, result.size()); // the reducer output statistics info to a sequence file.
+    }
+
+    @Test
+    public void testReducerNormalDimDictInReducer() throws IOException {
+        testNormalDim();
+    }
+
+    @Test
+    public void testReducerNormalDim() throws IOException {
+        KylinConfig kylinConfig = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        kylinConfig.setProperty("kylin.engine.mr.build-dict-in-reducer", "false");
+        testNormalDim();
+    }
+
+    private void setContextTaskId(final int taskId) {
+        Context context = reduceDriver.getContext();
+        when(context.getTaskAttemptID()).thenAnswer(new Answer<TaskAttemptID>() {
+            @Override
+            public TaskAttemptID answer(InvocationOnMock invocation) throws Throwable {
+                return TaskAttemptID.forName("attempt__0000_r_" + taskId + "_0");
+            }
+        });
+    }
+
+    private void setConfigurations() {
+        Configuration configuration = reduceDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment");
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(FileOutputFormat.OUTDIR, ".");
+    }
+
+    // copy from MultpleOutputs for test
+    private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
+    private static final String MO_PREFIX = "mapreduce.multipleoutputs.namedOutput.";
+    private static final String FORMAT = ".format";
+    private static final String KEY = ".key";
+    private static final String VALUE = ".value";
+
+    private void setMultipleOutputs(String namedOutput, Configuration conf,
+            Class<? extends OutputFormat> outputFormatClass, Class<?> keyClass, Class<?> valueClass)
+            throws IOException {
+        conf.set(MULTIPLE_OUTPUTS, conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
+        conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass, OutputFormat.class);
+        conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
+        conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
+    }
+
+    private HLLCounter createMockHLLCounter() {
+        HLLCounter hllc = new HLLCounter(14);
+        HLLCounter one = new HLLCounter(14);
+        for (int i = 0; i < 1000; i++) {
+            one.clear();
+            one.add(i);
+            hllc.merge(one);
+        }
+        return hllc;
+    }
+
+    private void testNormalDim() throws IOException {
+        setConfigurations();
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_COLUMN, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, NullWritable.class, Text.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_DICT, reduceDriver.getConfiguration(),
+                SequenceFileOutputFormat.class, NullWritable.class, ArrayPrimitiveWritable.class);
+        setMultipleOutputs(BatchConstants.CFG_OUTPUT_PARTITION, reduceDriver.getConfiguration(), TextOutputFormat.class,
+                NullWritable.class, LongWritable.class);
+
+        int nDimReducers = cubeDesc.getRowkey().getRowKeyColumns().length;
+        setContextTaskId(nDimReducers - 1);
+
+        ByteBuffer tmpBuf = ByteBuffer.allocate(4096);
+        String val = "100";
+        tmpBuf.put(Bytes.toBytes(val));
+        Text outputKey1 = new Text();
+        outputKey1.set(tmpBuf.array(), 0, tmpBuf.position());
+        SelfDefineSortableKey key1 = new SelfDefineSortableKey();
+        key1.init(outputKey1, (byte) 0);
+
+        reduceDriver.setInput(key1, ImmutableList.of(new Text()));
+        List<Pair<NullWritable, Text>> result = reduceDriver.run();
+        assertEquals(0, result.size());
+    }
+
 }
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java
new file mode 100644
index 0000000..612e9ae
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
+import org.apache.kylin.cube.inmemcubing.InputConverterUnitForRawData;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.rule.PowerMockRule;
+import org.powermock.reflect.Whitebox;
+
+@PrepareForTest({ MRUtil.class, CuboidSchedulerUtil.class, InMemCuboidMapper.class })
+public class InMemCuboidMapperTest extends LocalFileMetadataTestCase {
+    @Rule
+    public PowerMockRule rule = new PowerMockRule();
+
+    private String cubeName;
+    private CubeInstance cube;
+    private InMemCuboidMapper<NullWritable> inMemCuboidMapper;
+    private MapDriver<NullWritable, Object, ByteArrayWritable, ByteArrayWritable> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        inMemCuboidMapper = new InMemCuboidMapper<>();
+        mapDriver = MapDriver.newMapDriver(inMemCuboidMapper);
+
+        PowerMockito.stub(PowerMockito.method(CuboidSchedulerUtil.class, "getCuboidSchedulerByMode", CubeSegment.class,
+                String.class)).toReturn(cube.getCuboidScheduler());
+        IMRBatchCubingInputSide mockInputSide = createMockInputSide();
+        PowerMockito.stub(PowerMockito.method(MRUtil.class, "getBatchCubingInputSide")).toReturn(mockInputSide);
+
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+    }
+
+    @Test
+    public void testMapper() throws Exception {
+        TestHandler testHandler = new TestHandler();
+        setConfigurationAndMock(testHandler);
+
+        mapDriver.setInput(NullWritable.get(), NullWritable.get());
+        mapDriver.run();
+    }
+
+    @Test
+    public void testMapperWithCutRow() throws Exception {
+        Whitebox.setInternalState(inMemCuboidMapper, "splitRowThreshold", 1);
+        Whitebox.setInternalState(inMemCuboidMapper, "unitRows", 1);
+        TestHandler testHandler = new TestWithCutRowHandler();
+        setConfigurationAndMock(testHandler);
+
+        mapDriver.setInput(NullWritable.get(), NullWritable.get());
+        mapDriver.run();
+    }
+
+    private void setConfigurationAndMock(TestHandler testHandler) throws Exception {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, cubeName);
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+
+        DoggedCubeBuilder mockDoggedCubeBuilder = createMockDoggedCubeBuilder(testHandler);
+        PowerMockito.whenNew(DoggedCubeBuilder.class).withAnyArguments().thenReturn(mockDoggedCubeBuilder);
+    }
+
+    private IMRBatchCubingInputSide createMockInputSide() throws Exception {
+        IMRInput.IMRTableInputFormat mockInputFormat = createMockInputFormat();
+        IMRBatchCubingInputSide mockInputSide = PowerMockito.mock(IMRBatchCubingInputSide.class);
+        PowerMockito.when(mockInputSide.getFlatTableInputFormat()).thenReturn(mockInputFormat);
+        return mockInputSide;
+    }
+
+    private IMRTableInputFormat createMockInputFormat() throws Exception {
+        String[] row = getMockInputRow();
+        Collection<String[]> rows = new ArrayList<>();
+        rows.add(row);
+        IMRTableInputFormat mockFormat = PowerMockito.mock(IMRTableInputFormat.class);
+        PowerMockito.when(mockFormat, "parseMapperInput", Mockito.any()).thenReturn(rows);
+        return mockFormat;
+    }
+
+    private static String[] getMockInputRow() {
+        return new String[] { "2012-01-01" };
+    }
+
+    private DoggedCubeBuilder createMockDoggedCubeBuilder(final TestHandler hanlder) throws Exception {
+        DoggedCubeBuilder mockDoggedCubeBuilder = PowerMockito.mock(DoggedCubeBuilder.class);
+        PowerMockito.when(mockDoggedCubeBuilder, "buildAsRunnable", Mockito.any(BlockingQueue.class), Mockito.any(),
+                Mockito.any()).thenAnswer(new Answer<Object>() {
+                    @Override
+                    public Object answer(InvocationOnMock invocation) throws Throwable {
+                        Object[] args = invocation.getArguments();
+                        hanlder.setInputQueue((BlockingQueue) args[0]);
+                        return hanlder;
+                    }
+                });
+        return mockDoggedCubeBuilder;
+    }
+
+    private static class TestHandler implements Runnable {
+        BlockingQueue queue;
+
+        public void setInputQueue(BlockingQueue queue) {
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                consumeRows();
+
+                while (true) {
+                    String[] newRow = (String[]) queue.take();
+                    if (InputConverterUnitForRawData.END_ROW == newRow) {
+                        break;
+                    }
+                }
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        protected void consumeRows() throws InterruptedException {
+            String[] row = (String[]) queue.take();
+            assertArrayEquals(getMockInputRow(), row);
+        }
+
+    }
+
+    private static class TestWithCutRowHandler extends TestHandler {
+
+        protected void consumeRows() throws InterruptedException {
+            String[] row = (String[]) queue.take();
+            assertArrayEquals(getMockInputRow(), row);
+
+            row = (String[]) queue.take();
+            assertArrayEquals(InputConverterUnitForRawData.CUT_ROW, row);
+        }
+
+    }
+}
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java
new file mode 100644
index 0000000..acc5f6d
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/streaming/RowRecordReaderTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.streaming;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.stream.core.model.StreamingMessage;
+import org.apache.kylin.stream.core.query.StreamingQueryProfile;
+import org.apache.kylin.stream.core.storage.columnar.ColumnarMemoryStorePersister;
+import org.apache.kylin.stream.core.storage.columnar.DataSegmentFragment;
+import org.apache.kylin.stream.core.storage.columnar.FragmentData;
+import org.apache.kylin.stream.core.storage.columnar.FragmentFileSearcher;
+import org.apache.kylin.stream.core.storage.columnar.FragmentId;
+import org.apache.kylin.stream.core.storage.columnar.ParsedStreamingCubeInfo;
+import org.apache.kylin.stream.core.storage.columnar.SegmentMemoryStore;
+import org.apache.kylin.stream.core.storage.columnar.StreamingDataSimulator;
+import org.apache.log4j.PropertyConfigurator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class RowRecordReaderTest extends LocalFileMetadataTestCase {
+    private static final String cubeName = "test_streaming_v2_cube";
+
+    private String baseStorePath;
+    private CubeInstance cubeInstance;
+    private String segmentName;
+    private ParsedStreamingCubeInfo parsedStreamingCubeInfo;
+    private DataSegmentFragment fragment;
+    private FragmentFileSearcher fragmentFileSearcher;
+    private CubeDesc cubeDesc;
+    private int eventCnt = 50000;
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        this.baseStorePath = KylinConfig.getInstanceFromEnv().getStreamingIndexPath();
+        this.cubeInstance = CubeManager.getInstance(getTestConfig()).reloadCube(cubeName);
+        this.cubeDesc = cubeInstance.getDescriptor();
+        this.segmentName = "20171018100000_20171018110000";
+        this.parsedStreamingCubeInfo = new ParsedStreamingCubeInfo(cubeInstance);
+        this.fragment = new DataSegmentFragment(baseStorePath, cubeName, segmentName, new FragmentId(0));
+        PropertyConfigurator.configure("../build/conf/kylin-tools-log4j.properties");
+        prepareData();
+        fragmentFileSearcher = new FragmentFileSearcher(fragment, new FragmentData(fragment.getMetaInfo(),
+                fragment.getDataFile()));
+        StreamingQueryProfile.set(new StreamingQueryProfile("test-query-id", System.currentTimeMillis()));
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+        cleanupData();
+    }
+
+    @Test
+    public void testIterator() throws Exception {
+        Path path = new Path(fragment.getDataFile().getParentFile().getAbsolutePath());
+        FileSystem fs = FileSystem.getLocal(new Configuration());
+        RowRecordReader rowRecordReader = new RowRecordReader(cubeDesc, path, fs);
+        List<MeasureDesc> measures = cubeDesc.getMeasures();
+        List<DataTypeSerializer> dataTypeSerializers = Lists.newArrayListWithCapacity(measures.size());
+        for (MeasureDesc measure : measures) {
+            dataTypeSerializers.add(DataTypeSerializer.create(measure.getFunction().getReturnDataType()));
+        }
+        int rowNum = 0;
+        while (rowRecordReader.hasNextRow()) {
+            RowRecord record = rowRecordReader.nextRow();
+            if (rowNum < 10) {
+                String[] dimensions = record.getDimensions();
+                byte[][] metrics = record.getMetrics();
+                Object[] metricValues = new Object[metrics.length];
+                for (int i = 0; i < metrics.length; i++) {
+                    metricValues[i] = dataTypeSerializers.get(i).deserialize(ByteBuffer.wrap(metrics[i]));
+                }
+                StringBuilder sb = new StringBuilder();
+                for (int i = 0; i < dimensions.length; i++) {
+                    sb.append(dimensions[i]);
+                    sb.append(",");
+                }
+                for (int i = 0; i < metricValues.length; i++) {
+                    sb.append(metricValues[i].toString());
+                    sb.append(",");
+                }
+                System.out.println(sb.toString());
+            }
+            rowNum++;
+        }
+        assertEquals(eventCnt, rowNum);
+    }
+
+    protected void prepareData() {
+        // build additional cuboids
+        KylinConfigExt configExt = (KylinConfigExt) cubeInstance.getDescriptor().getConfig();
+        configExt.getExtendedOverrides().put("kylin.stream.build.additional.cuboids", "true");
+
+        ColumnarMemoryStorePersister memStorePersister = new ColumnarMemoryStorePersister(parsedStreamingCubeInfo,
+                segmentName);
+        StreamingDataSimulator simulator = new StreamingDataSimulator(
+                StreamingDataSimulator.getDefaultCardinalityMap(), 100000);
+        Iterator<StreamingMessage> streamingMessages = simulator.simulate(eventCnt, System.currentTimeMillis());
+        SegmentMemoryStore memoryStore = new SegmentMemoryStore(new ParsedStreamingCubeInfo(cubeInstance), segmentName);
+        while (streamingMessages.hasNext()) {
+            memoryStore.index(streamingMessages.next());
+        }
+
+        memStorePersister.persist(memoryStore, fragment);
+    }
+
+    private void cleanupData() throws IOException {
+        FileUtils.deleteQuietly(new File(baseStorePath));
+    }
+
+}
diff --git a/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq
new file mode 100644
index 0000000..02bcf8c
Binary files /dev/null and b/examples/test_case_data/localmeta/cube_statistics/test_kylin_cube_with_slr_1_new_segment/198va32a-a33e-4b69-83dd-0bb8b1f8c53b.seq differ
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 99873fd..2b82720 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -176,6 +176,12 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.mrunit</groupId>
+            <artifactId>mrunit</artifactId>
+            <classifier>hadoop2</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-annotations</artifactId>
             <scope>provided</scope>
diff --git a/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java
new file mode 100644
index 0000000..8d9fd7e
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapperTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mrunit.mapreduce.MapDriver;
+import org.apache.hadoop.mrunit.types.Pair;
+import org.apache.hive.hcatalog.data.DefaultHCatRecord;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FactDistinctColumnsMapperTest extends LocalFileMetadataTestCase {
+    private String cubeName;
+    private CubeInstance cube;
+    private CubeDesc cubeDesc;
+    private MapDriver<LongWritable, Object, SelfDefineSortableKey, Text> mapDriver;
+
+    @Before
+    public void setup() throws Exception {
+        createTestMetadata();
+        FileUtils.deleteDirectory(new File("./meta"));
+        FileUtils.copyDirectory(new File(getTestConfig().getMetadataUrl().toString()), new File("./meta"));
+
+        cubeName = "test_kylin_cube_with_slr_1_new_segment";
+        cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        FactDistinctColumnsMapper<LongWritable> factDistinctColumnsMapper = new FactDistinctColumnsMapper<>();
+        mapDriver = MapDriver.newMapDriver(factDistinctColumnsMapper);
+    }
+
+    @After
+    public void after() throws Exception {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testMapper() throws IOException {
+        Configuration configuration = mapDriver.getConfiguration();
+        configuration.set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, "100");
+        configuration.set(BatchConstants.CFG_CUBE_NAME, "test_kylin_cube_with_slr_1_new_segment");
+        configuration.set(BatchConstants.CFG_CUBE_SEGMENT_ID, "198va32a-a33e-4b69-83dd-0bb8b1f8c53b");
+        HCatRecord value1 = new DefaultHCatRecord(11);
+        value1.set(0, "2012-08-16");
+        value1.set(1, "48027");
+        value1.set(2, "0");
+        value1.set(3, "Home & Garden");
+        value1.set(4, "Cheese & Crackers");
+        value1.set(5, "Cheese & Crackers");
+        value1.set(6, "48027");
+        value1.set(7, "16");
+        value1.set(8, "10000010");
+        value1.set(9, "204.28");
+        value1.set(10, "5");
+        mapDriver.addInput(new LongWritable(0), value1);
+
+        List<Pair<SelfDefineSortableKey, Text>> result = mapDriver.run();
+        int colsNeedDictSize = cubeDesc.getAllColumnsNeedDictionaryBuilt().size();
+        int cuboidsCnt = cubeDesc.getAllCuboids().size();
+
+        assertEquals(
+                colsNeedDictSize + (cubeDesc.getRowkey().getRowKeyColumns().length - colsNeedDictSize) * 2 + cuboidsCnt,
+                result.size());
+    }
+
+}
diff --git a/pom.xml b/pom.xml
index b3e80b5..bd916b0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -178,7 +178,7 @@
     </sonar.jacoco.excludes>
 
     <!-- JVM Args for Testing -->
-    <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -Duser.timezone=UTC</argLine>
+    <argLine>-Xms1G -Xmx2G -XX:MaxPermSize=512M -noverify -Duser.timezone=UTC</argLine>
   </properties>
 
   <licenses>


[kylin] 01/02: KYLIN-4508 Add more unit tests for core-metrics module

Posted by sh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 2d718c83db2abd91d053b77a44ed4a98648adcdc
Author: Zhong, Yanghong <nj...@apache.org>
AuthorDate: Wed May 20 18:21:19 2020 +0800

    KYLIN-4508 Add more unit tests for core-metrics module
    
    Signed-off-by: shaofengshi <sh...@apache.org>
---
 core-metrics/pom.xml                               |  17 +++
 .../kylin/metrics/lib/impl/BlockingReservoir.java  |  21 +++-
 .../apache/kylin/metrics/lib/impl/RecordEvent.java |   6 +-
 .../kylin/metrics/lib/impl/TimePropertyEnum.java   |   2 +-
 .../metrics/lib/impl/BlockingReservoirTest.java    |  76 ++++++++++++
 .../metrics/lib/impl/InstantReservoirTest.java     |  64 ++++++++++
 .../kylin/metrics/lib/impl/MetricsSystemTest.java  |  67 +++++++++++
 .../kylin/metrics/lib/impl/RecordEventTest.java    |  82 +++++++++++++
 .../metrics/property/MetricsPropertyEnumTest.java  | 132 +++++++++++++++++++++
 9 files changed, 463 insertions(+), 4 deletions(-)

diff --git a/core-metrics/pom.xml b/core-metrics/pom.xml
index b9bc566..eca9e5e 100644
--- a/core-metrics/pom.xml
+++ b/core-metrics/pom.xml
@@ -45,5 +45,22 @@
             <artifactId>metrics-core</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <!-- Env & Test -->
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-common</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
index 6158096..4798e52 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -22,6 +22,8 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.kylin.metrics.lib.ActiveReservoirListener;
 import org.apache.kylin.metrics.lib.Record;
 import org.slf4j.Logger;
@@ -58,6 +60,10 @@ public class BlockingReservoir extends AbstractActiveReservoir {
     }
 
     public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime) {
+        Preconditions.checkArgument(minReportSize > 0, "minReportSize should be larger than 0");
+        Preconditions.checkArgument(maxReportSize >= minReportSize,
+                "maxReportSize should not be less than minBatchSize");
+        Preconditions.checkArgument(maxReportTime > 0, "maxReportTime should be larger than 0");
         this.minReportSize = minReportSize;
         this.maxReportSize = maxReportSize;
         this.maxReportTime = maxReportTime * 60 * 1000L;
@@ -95,9 +101,11 @@ public class BlockingReservoir extends AbstractActiveReservoir {
         if (ifAll) {
             records = Lists.newArrayList();
             recordsQueue.drainTo(records);
+            logger.info("Will report {} metrics records", records.size());
         } else {
             records.clear();
             recordsQueue.drainTo(records, maxReportSize);
+            logger.info("Will report {} metrics records, remaining {} records", records.size(), size());
         }
 
         boolean ifSucceed = true;
@@ -127,9 +135,18 @@ public class BlockingReservoir extends AbstractActiveReservoir {
         return true;
     }
 
-    @Override
-    public void start() {
+    @VisibleForTesting
+    void notifyUpdate() {
+        onRecordUpdate(false);
+    }
+
+    @VisibleForTesting
+    void setReady() {
         super.start();
+    }
+
+    public void start() {
+        setReady();
         scheduledReporter.start();
     }
 
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
index bc4faf0..59aa7e3 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -66,6 +66,10 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable {
         this(eventType, LOCAL_HOSTNAME);
     }
 
+    public RecordEvent(String eventType, long time) {
+        this(eventType, LOCAL_HOSTNAME, time);
+    }
+    
     public RecordEvent(String eventType, String host) {
         this(eventType, host, System.currentTimeMillis());
     }
@@ -268,7 +272,7 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable {
             return reserveKey;
         }
 
-        public RecordReserveKeyEnum getByKey(String key) {
+        public static RecordReserveKeyEnum getByKey(String key) {
             for (RecordReserveKeyEnum reserveKey : RecordReserveKeyEnum.values()) {
                 if (reserveKey.reserveKey.equalsIgnoreCase(key)) {
                     return reserveKey;
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
index 03f188b..01eefa1 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
@@ -38,7 +38,7 @@ public enum TimePropertyEnum {
         this.propertyName = propertyName;
     }
 
-    public static TimePropertyEnum getByPropertyName(String propertyName) {
+    public static TimePropertyEnum getByKey(String propertyName) {
         if (Strings.isNullOrEmpty(propertyName)) {
             return null;
         }
diff --git a/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/BlockingReservoirTest.java b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/BlockingReservoirTest.java
new file mode 100644
index 0000000..47c9f46
--- /dev/null
+++ b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/BlockingReservoirTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metrics.lib.Record;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BlockingReservoirTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testUpdate() {
+        BlockingReservoir reservoir = new BlockingReservoir();
+
+        Record record = new RecordEvent("TEST");
+        reservoir.update(record);
+        assertEquals(0, reservoir.size());
+
+        reservoir.start();
+        reservoir.update(record);
+        assertEquals(1, reservoir.size());
+
+        reservoir.stop();
+        assertEquals(0, reservoir.size());
+    }
+
+    @Test
+    public void testBatchSize() {
+        BlockingReservoir reservoir = new BlockingReservoir(5, 12);
+        reservoir.setReady();
+
+        for (int i = 0; i < 30; i++) {
+            Record record = new RecordEvent("TEST" + i);
+            reservoir.update(record);
+        }
+        reservoir.notifyUpdate();
+        Assert.assertEquals(18, reservoir.size());
+
+        reservoir.notifyUpdate();
+        Assert.assertEquals(6, reservoir.size());
+
+        reservoir.notifyUpdate();
+        Assert.assertEquals(0, reservoir.size());
+    }
+}
diff --git a/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/InstantReservoirTest.java b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/InstantReservoirTest.java
new file mode 100644
index 0000000..982978e
--- /dev/null
+++ b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/InstantReservoirTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metrics.lib.ActiveReservoirListener;
+import org.apache.kylin.metrics.lib.Record;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class InstantReservoirTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testUpdate() {
+        InstantReservoir reservoir = new InstantReservoir();
+        ActiveReservoirListener listener = new StubReservoirReporter().listener;
+        reservoir.addListener(listener);
+
+        Record record = new RecordEvent("TEST");
+        assertTrue(listener.onRecordUpdate(Lists.newArrayList(record)));
+
+        reservoir.update(record);
+        assertEquals(0, reservoir.size());
+
+        reservoir.start();
+        reservoir.update(record);
+        assertEquals(0, reservoir.size());
+
+        reservoir.stop();
+        assertEquals(0, reservoir.size());
+    }
+}
diff --git a/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/MetricsSystemTest.java b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/MetricsSystemTest.java
new file mode 100644
index 0000000..12caf14
--- /dev/null
+++ b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/MetricsSystemTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.apache.kylin.metrics.lib.impl.MetricsSystem.Metrics;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kylin.metrics.lib.ActiveReservoir;
+import org.apache.kylin.metrics.lib.ActiveReservoirRecordFilter;
+import org.junit.Test;
+
+public class MetricsSystemTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testDuplicateRegister() {
+        String name = "test1";
+        Metrics.register(name, new StubReservoir());
+        Metrics.register(name, new StubReservoir());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNullRegister1() {
+        Metrics.register(null, new StubReservoir());
+    }
+
+    @Test
+    public void testActiveReservoir() {
+        //Remove all ActiveReservoirs
+        Metrics.removeActiveReservoirMatching(ActiveReservoirRecordFilter.ALL);
+        assertEquals(0, Metrics.getActiveReservoirs().size());
+
+        //Get all the ActiveReservoirs
+        int n = 10;
+        for (int i = 0; i < n; i++) {
+            Metrics.register("ActiveReservoir-" + i, new StubReservoir());
+        }
+        assertEquals(n, Metrics.getActiveReservoirs().size());
+
+        String name = "test2";
+        ActiveReservoir activeReservoir = new StubReservoir();
+        Metrics.register(name, activeReservoir);
+
+        //Get ActiveReservoir by name
+        assertEquals(activeReservoir, Metrics.activeReservoir(name));
+        //Remove ActiveReservoir by name
+        assertTrue(Metrics.removeActiveReservoir(name));
+        assertFalse(Metrics.removeActiveReservoir(name));
+    }
+}
diff --git a/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/RecordEventTest.java b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/RecordEventTest.java
new file mode 100644
index 0000000..0c53d56
--- /dev/null
+++ b/core-metrics/src/test/java/org/apache/kylin/metrics/lib/impl/RecordEventTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.lib.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+
+public class RecordEventTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testSetEventType() {
+        new RecordEvent(null, System.currentTimeMillis());
+    }
+
+    @Test
+    public void testBasic() throws IOException {
+        String type = "TEST";
+        String localHostname;
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            localHostname = addr.getHostName() + ":" + addr.getHostAddress();
+        } catch (UnknownHostException e) {
+            localHostname = "Unknown";
+        }
+        long time = System.currentTimeMillis();
+        RecordEvent event = new RecordEvent(type, localHostname, time);
+
+        assertEquals(type, event.getEventType());
+        assertEquals(localHostname, event.getHost());
+        assertTrue(time == event.getTime());
+
+        String key = "PROJECT";
+        String value = "test";
+        event.put(key, value);
+        assertEquals(value, event.remove(key));
+
+        int len1 = event.size();
+        Map<String, Object> entryMap = Maps.newHashMap();
+        for (int i = 0; i < 5; i++) {
+            entryMap.put(key + "-" + i, value + "-" + i);
+        }
+        event.putAll(entryMap);
+        assertEquals(entryMap.size(), event.size() - len1);
+
+        assertTrue(event.clone().equals(event));
+
+        Map<String, Object> rawValue = JsonUtil.readValue(event.getValue(), Map.class);
+        assertEquals(event.size() - 1, rawValue.size());
+
+        assertNull(rawValue.get(RecordEvent.RecordReserveKeyEnum.EVENT_SUBJECT.toString()));
+
+        event.clear();
+        assertTrue(event.isEmpty());
+    }
+}
diff --git a/core-metrics/src/test/java/org/apache/kylin/metrics/property/MetricsPropertyEnumTest.java b/core-metrics/src/test/java/org/apache/kylin/metrics/property/MetricsPropertyEnumTest.java
new file mode 100644
index 0000000..81471fe
--- /dev/null
+++ b/core-metrics/src/test/java/org/apache/kylin/metrics/property/MetricsPropertyEnumTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metrics.property;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.kylin.metrics.lib.impl.RecordEvent;
+import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+import org.junit.Test;
+
+public class MetricsPropertyEnumTest {
+
+    @Test
+    public void testJobPropertyEnum() {
+        assertEquals(JobPropertyEnum.ID_CODE, JobPropertyEnum.getByName("JOB_ID"));
+        assertEquals(JobPropertyEnum.USER, JobPropertyEnum.getByName("KUSER"));
+        assertEquals(JobPropertyEnum.PROJECT, JobPropertyEnum.getByName("PROJECT"));
+        assertEquals(JobPropertyEnum.CUBE, JobPropertyEnum.getByName("CUBE_NAME"));
+        assertEquals(JobPropertyEnum.TYPE, JobPropertyEnum.getByName("JOB_TYPE"));
+        assertEquals(JobPropertyEnum.ALGORITHM, JobPropertyEnum.getByName("CUBING_TYPE"));
+        assertEquals(JobPropertyEnum.STATUS, JobPropertyEnum.getByName("JOB_STATUS"));
+        assertEquals(JobPropertyEnum.EXCEPTION, JobPropertyEnum.getByName("EXCEPTION"));
+        assertEquals(JobPropertyEnum.SOURCE_SIZE, JobPropertyEnum.getByName("TABLE_SIZE"));
+        assertEquals(JobPropertyEnum.CUBE_SIZE, JobPropertyEnum.getByName("CUBE_SIZE"));
+        assertEquals(JobPropertyEnum.BUILD_DURATION, JobPropertyEnum.getByName("DURATION"));
+        assertEquals(JobPropertyEnum.WAIT_RESOURCE_TIME, JobPropertyEnum.getByName("WAIT_RESOURCE_TIME"));
+        assertEquals(JobPropertyEnum.PER_BYTES_TIME_COST, JobPropertyEnum.getByName("PER_BYTES_TIME_COST"));
+        assertEquals(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS,
+                JobPropertyEnum.getByName("STEP_DURATION_DISTINCT_COLUMNS"));
+        assertEquals(JobPropertyEnum.STEP_DURATION_DICTIONARY, JobPropertyEnum.getByName("STEP_DURATION_DICTIONARY"));
+        assertEquals(JobPropertyEnum.STEP_DURATION_INMEM_CUBING,
+                JobPropertyEnum.getByName("STEP_DURATION_INMEM_CUBING"));
+        assertEquals(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT,
+                JobPropertyEnum.getByName("STEP_DURATION_HFILE_CONVERT"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+
+    @Test
+    public void testQueryPropertyEnum() {
+        assertEquals(QueryPropertyEnum.ID_CODE, QueryPropertyEnum.getByName("QUERY_HASH_CODE"));
+        assertEquals(QueryPropertyEnum.TYPE, QueryPropertyEnum.getByName("QUERY_TYPE"));
+        assertEquals(QueryPropertyEnum.USER, QueryPropertyEnum.getByName("KUSER"));
+        assertEquals(QueryPropertyEnum.PROJECT, QueryPropertyEnum.getByName("PROJECT"));
+        assertEquals(QueryPropertyEnum.REALIZATION, QueryPropertyEnum.getByName("REALIZATION"));
+        assertEquals(QueryPropertyEnum.REALIZATION_TYPE, QueryPropertyEnum.getByName("REALIZATION_TYPE"));
+        assertEquals(QueryPropertyEnum.EXCEPTION, QueryPropertyEnum.getByName("EXCEPTION"));
+        assertEquals(QueryPropertyEnum.TIME_COST, QueryPropertyEnum.getByName("QUERY_TIME_COST"));
+        assertEquals(QueryPropertyEnum.CALCITE_RETURN_COUNT, QueryPropertyEnum.getByName("CALCITE_COUNT_RETURN"));
+        assertEquals(QueryPropertyEnum.STORAGE_RETURN_COUNT, QueryPropertyEnum.getByName("STORAGE_COUNT_RETURN"));
+        assertEquals(QueryPropertyEnum.AGGR_FILTER_COUNT,
+                QueryPropertyEnum.getByName("CALCITE_COUNT_AGGREGATE_FILTER"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+
+    @Test
+    public void testQueryCubePropertyEnum() {
+        assertEquals(QueryCubePropertyEnum.PROJECT, QueryCubePropertyEnum.getByName("PROJECT"));
+        assertEquals(QueryCubePropertyEnum.CUBE, QueryCubePropertyEnum.getByName("CUBE_NAME"));
+        assertEquals(QueryCubePropertyEnum.SEGMENT, QueryCubePropertyEnum.getByName("SEGMENT_NAME"));
+        assertEquals(QueryCubePropertyEnum.CUBOID_SOURCE, QueryCubePropertyEnum.getByName("CUBOID_SOURCE"));
+        assertEquals(QueryCubePropertyEnum.CUBOID_TARGET, QueryCubePropertyEnum.getByName("CUBOID_TARGET"));
+        assertEquals(QueryCubePropertyEnum.IF_MATCH, QueryCubePropertyEnum.getByName("IF_MATCH"));
+        assertEquals(QueryCubePropertyEnum.FILTER_MASK, QueryCubePropertyEnum.getByName("FILTER_MASK"));
+        assertEquals(QueryCubePropertyEnum.IF_SUCCESS, QueryCubePropertyEnum.getByName("IF_SUCCESS"));
+        assertEquals(QueryCubePropertyEnum.TIME_SUM, QueryCubePropertyEnum.getByName("STORAGE_CALL_TIME_SUM"));
+        assertEquals(QueryCubePropertyEnum.TIME_MAX, QueryCubePropertyEnum.getByName("STORAGE_CALL_TIME_MAX"));
+        assertEquals(QueryCubePropertyEnum.WEIGHT_PER_HIT, QueryCubePropertyEnum.getByName("WEIGHT_PER_HIT"));
+        assertEquals(QueryCubePropertyEnum.CALL_COUNT, QueryCubePropertyEnum.getByName("STORAGE_CALL_COUNT"));
+        assertEquals(QueryCubePropertyEnum.SKIP_COUNT, QueryCubePropertyEnum.getByName("STORAGE_COUNT_SKIP"));
+        assertEquals(QueryCubePropertyEnum.SCAN_COUNT, QueryCubePropertyEnum.getByName("STORAGE_COUNT_SCAN"));
+        assertEquals(QueryCubePropertyEnum.RETURN_COUNT, QueryCubePropertyEnum.getByName("STORAGE_COUNT_RETURN"));
+        assertEquals(QueryCubePropertyEnum.AGGR_FILTER_COUNT,
+                QueryCubePropertyEnum.getByName("STORAGE_COUNT_AGGREGATE_FILTER"));
+        assertEquals(QueryCubePropertyEnum.AGGR_COUNT, QueryCubePropertyEnum.getByName("STORAGE_COUNT_AGGREGATE"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+
+    @Test
+    public void testQueryRPCPropertyEnum() {
+        assertEquals(QueryRPCPropertyEnum.PROJECT, QueryRPCPropertyEnum.getByName("PROJECT"));
+        assertEquals(QueryRPCPropertyEnum.REALIZATION, QueryRPCPropertyEnum.getByName("REALIZATION"));
+        assertEquals(QueryRPCPropertyEnum.RPC_SERVER, QueryRPCPropertyEnum.getByName("RPC_SERVER"));
+        assertEquals(QueryRPCPropertyEnum.EXCEPTION, QueryRPCPropertyEnum.getByName("EXCEPTION"));
+        assertEquals(QueryRPCPropertyEnum.CALL_TIME, QueryRPCPropertyEnum.getByName("CALL_TIME"));
+        assertEquals(QueryRPCPropertyEnum.SKIP_COUNT, QueryRPCPropertyEnum.getByName("COUNT_SKIP"));
+        assertEquals(QueryRPCPropertyEnum.SCAN_COUNT, QueryRPCPropertyEnum.getByName("COUNT_SCAN"));
+        assertEquals(QueryRPCPropertyEnum.RETURN_COUNT, QueryRPCPropertyEnum.getByName("COUNT_RETURN"));
+        assertEquals(QueryRPCPropertyEnum.AGGR_FILTER_COUNT, QueryRPCPropertyEnum.getByName("COUNT_AGGREGATE_FILTER"));
+        assertEquals(QueryRPCPropertyEnum.AGGR_COUNT, QueryRPCPropertyEnum.getByName("COUNT_AGGREGATE"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+
+    @Test
+    public void testTimePropertyEnum() {
+        assertEquals(TimePropertyEnum.YEAR, TimePropertyEnum.getByKey("KYEAR_BEGIN_DATE"));
+        assertEquals(TimePropertyEnum.MONTH, TimePropertyEnum.getByKey("KMONTH_BEGIN_DATE"));
+        assertEquals(TimePropertyEnum.WEEK_BEGIN_DATE, TimePropertyEnum.getByKey("KWEEK_BEGIN_DATE"));
+        assertEquals(TimePropertyEnum.DAY_DATE, TimePropertyEnum.getByKey("KDAY_DATE"));
+        assertEquals(TimePropertyEnum.DAY_TIME, TimePropertyEnum.getByKey("KDAY_TIME"));
+        assertEquals(TimePropertyEnum.TIME_HOUR, TimePropertyEnum.getByKey("KTIME_HOUR"));
+        assertEquals(TimePropertyEnum.TIME_MINUTE, TimePropertyEnum.getByKey("KTIME_MINUTE"));
+        assertEquals(TimePropertyEnum.TIME_SECOND, TimePropertyEnum.getByKey("KTIME_SECOND"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+
+    @Test
+    public void testRecordReserveKeyEnum() {
+        assertEquals(RecordEvent.RecordReserveKeyEnum.EVENT_SUBJECT,
+                RecordEvent.RecordReserveKeyEnum.getByKey("EVENT_TYPE"));
+        assertEquals(RecordEvent.RecordReserveKeyEnum.ID, RecordEvent.RecordReserveKeyEnum.getByKey("EVENT_ID"));
+        assertEquals(RecordEvent.RecordReserveKeyEnum.HOST, RecordEvent.RecordReserveKeyEnum.getByKey("HOST"));
+        assertEquals(RecordEvent.RecordReserveKeyEnum.TIME, RecordEvent.RecordReserveKeyEnum.getByKey("KTIMESTAMP"));
+        assertNull(RecordEvent.RecordReserveKeyEnum.getByKey(null));
+    }
+}
\ No newline at end of file