You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/11/08 22:34:42 UTC

[1/6] kylin git commit: KYLIN-1851 add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey

Repository: kylin
Updated Branches:
  refs/heads/master ddec049a6 -> c8efa5483


http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
new file mode 100644
index 0000000..dfc46b6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import com.google.common.collect.Lists;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.BufferedMeasureCodec;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctHiveColumnsMapper2<KEYIN> extends FactDistinctColumnsMapperBase2<KEYIN, Object> {
+
+    protected boolean collectStatistics = false;
+    protected CuboidScheduler cuboidScheduler = null;
+    protected int nRowKey;
+    private Integer[][] allCuboidsBitSet = null;
+    private HyperLogLogPlusCounter[] allCuboidsHLL = null;
+    private Long[] cuboidIds;
+    private HashFunction hf = null;
+    private int rowCount = 0;
+    private int samplingPercentage;
+    private ByteArray[] row_hashcodes = null;
+    private ByteBuffer keyBuffer;
+    private static final Text EMPTY_TEXT = new Text();
+    public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE;
+    public static final byte MARK_FOR_HLL = (byte) 0xFF;
+
+    private int partitionColumnIndex = -1;
+    private boolean needFetchPartitionCol = true;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.setup(context);
+        keyBuffer = ByteBuffer.allocate(4096);
+        collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED));
+        if (collectStatistics) {
+            samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT));
+            cuboidScheduler = new CuboidScheduler(cubeDesc);
+            nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+            List<Long> cuboidIdList = Lists.newArrayList();
+            List<Integer[]> allCuboidsBitSetList = Lists.newArrayList();
+            addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList);
+
+            allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]);
+            cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]);
+
+            allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length];
+            for (int i = 0; i < cuboidIds.length; i++) {
+                allCuboidsHLL[i] = new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision());
+            }
+
+            hf = Hashing.murmur3_32();
+            row_hashcodes = new ByteArray[nRowKey];
+            for (int i = 0; i < nRowKey; i++) {
+                row_hashcodes[i] = new ByteArray();
+            }
+
+            TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef();
+            if (partitionColRef != null) {
+                partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef);
+            }
+
+            // check whether need fetch the partition col values
+            if (partitionColumnIndex < 0) {
+                // if partition col not on cube, no need
+                needFetchPartitionCol = false;
+            } else {
+                for (int x : dictionaryColumnIndex) {
+                    if (x == partitionColumnIndex) {
+                        // if partition col already build dict, no need
+                        needFetchPartitionCol = false;
+                        break;
+                    }
+                }
+            }
+
+        }
+    }
+
+    private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) {
+        allCuboids.add(cuboidId);
+        Integer[] indice = new Integer[Long.bitCount(cuboidId)];
+
+        long mask = Long.highestOneBit(baseCuboidId);
+        int position = 0;
+        for (int i = 0; i < nRowKey; i++) {
+            if ((mask & cuboidId) > 0) {
+                indice[position] = i;
+                position++;
+            }
+            mask = mask >> 1;
+        }
+
+        allCuboidsBitSet.add(indice);
+        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        for (Long childId : children) {
+            addCuboidBitSet(childId, allCuboidsBitSet, allCuboids);
+        }
+    }
+
+    @Override
+    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
+        String[] row = flatTableInputFormat.parseMapperInput(record);
+
+        keyBuffer.clear();
+        try {
+            for (int i = 0; i < factDictCols.size(); i++) {
+                String fieldValue = row[dictionaryColumnIndex[i]];
+                if (fieldValue == null)
+                    continue;
+                int offset = keyBuffer.position();
+                keyBuffer.put(Bytes.toBytes(i)[3]); // one byte is enough
+                keyBuffer.put(Bytes.toBytes(fieldValue));
+                outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
+                sortableKey.setText(outputKey);
+                //judge type
+                DataType type = factDictCols.get(i).getType();
+                if (!type.isNumberFamily()) {
+                    sortableKey.setTypeId((byte) TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+                } else if (type.isIntegerFamily()) {
+                    sortableKey.setTypeId((byte) TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+                } else {
+                    sortableKey.setTypeId((byte) TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+                }
+                context.write(sortableKey, EMPTY_TEXT);
+            }
+        } catch (Exception ex) {
+            handleErrorRecord(row, ex);
+        }
+
+        if (collectStatistics) {
+            if (rowCount < samplingPercentage) {
+                putRowKeyToHLL(row);
+            }
+
+            if (needFetchPartitionCol == true) {
+                String fieldValue = row[partitionColumnIndex];
+                if (fieldValue != null) {
+                    int offset = keyBuffer.position();
+                    keyBuffer.put(MARK_FOR_PARTITION_COL);
+                    keyBuffer.put(Bytes.toBytes(fieldValue));
+                    outputKey.set(keyBuffer.array(), offset, keyBuffer.position() - offset);
+                    sortableKey.setText(outputKey);
+                    sortableKey.setTypeId((byte) 0);
+                    context.write(sortableKey, EMPTY_TEXT);
+                }
+            }
+        }
+
+        if (rowCount++ == 100)
+            rowCount = 0;
+    }
+
+    private void putRowKeyToHLL(String[] row) {
+
+        //generate hash for each row key column
+        for (int i = 0; i < nRowKey; i++) {
+            Hasher hc = hf.newHasher();
+            String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]];
+            if (colValue != null) {
+                row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
+            } else {
+                row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
+            }
+        }
+
+        // user the row key column hash to get a consolidated hash for each cuboid
+        for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) {
+            Hasher hc = hf.newHasher();
+            for (int position = 0; position < allCuboidsBitSet[i].length; position++) {
+                hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array());
+            }
+
+            allCuboidsHLL[i].add(hc.hash().asBytes());
+        }
+    }
+
+    @Override
+    protected void cleanup(Context context) throws IOException, InterruptedException {
+        if (collectStatistics) {
+            ByteBuffer hllBuf = ByteBuffer.allocate(BufferedMeasureCodec.DEFAULT_BUFFER_SIZE);
+            // output each cuboid's hll to reducer, key is 0 - cuboidId
+            HyperLogLogPlusCounter hll;
+            for (int i = 0; i < cuboidIds.length; i++) {
+                hll = allCuboidsHLL[i];
+
+                keyBuffer.clear();
+                keyBuffer.put(MARK_FOR_HLL); // one byte
+                keyBuffer.putLong(cuboidIds[i]);
+                outputKey.set(keyBuffer.array(), 0, keyBuffer.position());
+                sortableKey.setText(outputKey);
+                sortableKey.setTypeId((byte) 0);
+                hllBuf.clear();
+                hll.writeRegisters(hllBuf);
+                outputValue.set(hllBuf.array(), 0, hllBuf.position());
+                context.write(sortableKey, outputValue);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
new file mode 100644
index 0000000..cadbcbf
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/SelfDefineSortableKey.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+/**
+ * Created by xiefan on 16-11-1.
+ */
+public class SelfDefineSortableKey implements WritableComparable<SelfDefineSortableKey> {
+
+    private byte typeId; //non-numeric(0000 0000) int(0000 0001) other numberic(0000 0010)
+
+    private Text text;
+
+    private static final Logger logger = LoggerFactory.getLogger(SelfDefineSortableKey.class);
+
+    public SelfDefineSortableKey() {
+    }
+
+    public SelfDefineSortableKey(byte typeId, Text text) {
+        this.typeId = typeId;
+        this.text = text;
+    }
+
+    @Override
+    public int compareTo(SelfDefineSortableKey o) {
+        if (!o.isNumberFamily()) {
+            return this.text.compareTo(o.text);
+        } else {
+            byte[] data1 = this.text.getBytes();
+            byte[] data2 = o.text.getBytes();
+            String str1 = new String(data1, 1, data1.length - 1);
+            String str2 = new String(data2, 1, data2.length - 1);
+            if (str1 == null || str1.equals("") || str2 == null || str2.equals("")) {
+                //should not achieve here
+                logger.error("none numeric value!");
+                return 0;
+            }
+            if (o.isIntegerFamily()) {  //integer type
+                try {
+                    Long num1 = Long.parseLong(str1);
+                    Long num2 = Long.parseLong(str2);
+                    return num1.compareTo(num2);
+                } catch (NumberFormatException e) {
+                    System.out.println("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2);
+                    logger.error("NumberFormatException when parse integer family number.str1:" + str1 + " str2:" + str2);
+                    e.printStackTrace();
+                    return 0;
+                }
+            } else {  //other numeric type
+                try {
+                    Double num1 = Double.parseDouble(str1);
+                    Double num2 = Double.parseDouble(str2);
+                    return num1.compareTo(num2);
+                } catch (NumberFormatException e) {
+                    System.out.println("NumberFormatException when parse double family number.str1:" + str1 + " str2:" + str2);
+                    logger.error("NumberFormatException when parse doul family number.str1:" + str1 + " str2:" + str2);
+                    //e.printStackTrace();
+                    return 0;
+                }
+            }
+        }
+    }
+
+    @Override
+    public void write(DataOutput dataOutput) throws IOException {
+        dataOutput.writeByte(typeId);
+        text.write(dataOutput);
+    }
+
+    @Override
+    public void readFields(DataInput dataInput) throws IOException {
+        dataInput.readByte();
+        text.readFields(dataInput);
+    }
+
+    public short getTypeId() {
+        return typeId;
+    }
+
+    public Text getText() {
+        return text;
+    }
+
+    public boolean isNumberFamily() {
+        if (typeId == TypeFlag.NONE_NUMERIC_TYPE.ordinal()) return false;
+        return true;
+    }
+
+    public boolean isIntegerFamily() {
+        return (typeId == TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+    }
+
+    public boolean isOtherNumericFamily() {
+        return (typeId == TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+    }
+
+    public void setTypeId(byte typeId) {
+        this.typeId = typeId;
+    }
+
+    public void setText(Text text) {
+        this.text = text;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
new file mode 100644
index 0000000..c69acfd
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/TypeFlag.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public enum TypeFlag {
+    NONE_NUMERIC_TYPE,
+    INTEGER_FAMILY_TYPE,
+    DOUBLE_FAMILY_TYPE
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
new file mode 100644
index 0000000..70197ac
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/NumberDictionaryForestTest.java
@@ -0,0 +1,214 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dict.NumberDictionary;
+import org.apache.kylin.dict.NumberDictionaryBuilder;
+import org.apache.kylin.dict.NumberDictionaryForest;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionaryForestBuilder;
+import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey;
+import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+
+
+public class NumberDictionaryForestTest {
+    @Test
+    public void testNumberDictionaryForestLong(){
+        List<String> list = randomLongData(10);
+        testData(list,TypeFlag.INTEGER_FAMILY_TYPE);
+    }
+
+    @Test
+    public void testNumberDictionaryForestDouble(){
+        List<String> list = randomDoubleData(10);
+
+        testData(list,TypeFlag.DOUBLE_FAMILY_TYPE);
+    }
+
+    private void testData(List<String> list,TypeFlag flag){
+        //stimulate map-reduce job
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(list,(byte)flag.ordinal());
+        Collections.sort(keyList);
+        //build tree
+        NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(
+                new StringBytesConverter(),0);
+        TrieDictionaryForestBuilder.MaxTrieTreeSize = 0;
+        for(SelfDefineSortableKey key : keyList){
+            String fieldValue = printKey(key);
+            b.addValue(fieldValue);
+        }
+        NumberDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+        ArrayList<Integer> resultIds = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String fieldValue = getFieldValue(key);
+            resultIds.add(dict.getIdFromValue(fieldValue));
+            assertEquals(fieldValue,dict.getValueFromId(dict.getIdFromValue(fieldValue)));
+        }
+        assertTrue(isIncreasedOrder(resultIds, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        }));
+    }
+
+    @Test
+    public void serializeTest() {
+        List<String> testData = new ArrayList<>();
+        testData.add("1");
+        testData.add("2");
+        testData.add("100");
+        //TrieDictionaryForestBuilder.MaxTrieTreeSize = 0;
+        NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter());
+        for(String str : testData)
+            b.addValue(str);
+        NumberDictionaryForest<String> dict = b.build();
+        dict = testSerialize(dict);
+        dict.dump(System.out);
+        for (String str : testData) {
+            assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str)));
+        }
+    }
+
+    @Test
+    public void testVerySmallDouble(){
+        List<String> testData = new ArrayList<>();
+        testData.add(-1.0+"");
+        testData.add(Double.MIN_VALUE+"");
+        testData.add("1.01");
+        testData.add("2.0");
+        NumberDictionaryForestBuilder<String> b = new NumberDictionaryForestBuilder<String>(new StringBytesConverter());
+        for(String str : testData)
+            b.addValue(str);
+        NumberDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+
+        NumberDictionaryBuilder<String> b2 = new NumberDictionaryBuilder<>(new StringBytesConverter());
+        for(String str : testData)
+            b2.addValue(str);
+        NumberDictionary<String> dict2 = b2.build(0);
+        dict2.dump(System.out);
+
+    }
+
+    private static NumberDictionaryForest<String> testSerialize(NumberDictionaryForest<String> dict) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            NumberDictionaryForest<String> r = new NumberDictionaryForest<>();
+            //r.dump(System.out);
+            r.readFields(datain);
+            //r.dump(System.out);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<String> randomLongData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(rand.nextLong()+"");
+        }
+        list.add(Long.MAX_VALUE+"");
+        list.add(Long.MIN_VALUE+"");
+        return list;
+    }
+
+    private List<String> randomDoubleData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(rand.nextDouble()+"");
+        }
+        list.add("-1");
+        return list;
+    }
+
+    private List<String> randomStringData(int count){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<String> list = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            list.add(UUID.randomUUID().toString());
+        }
+        list.add("123");
+        list.add("123");
+        return list;
+    }
+
+    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){
+        int partationId = 0;
+        ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>();
+        for(String str : strNumList){
+            ByteBuffer keyBuffer = ByteBuffer.allocate(4096);
+            int offset = keyBuffer.position();
+            keyBuffer.put(Bytes.toBytes(partationId)[3]);
+            keyBuffer.put(Bytes.toBytes(str));
+            //System.out.println(Arrays.toString(keyBuffer.array()));
+            byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1);
+            //System.out.println("new string:"+new String(valueField));
+            //System.out.println("arrays toString:"+Arrays.toString(valueField));
+            Text outputKey = new Text();
+            outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset);
+            SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey);
+            keyList.add(sortableKey);
+        }
+        return keyList;
+    }
+
+    private String printKey(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue));
+        return new String(fieldValue);
+    }
+
+    private String getFieldValue(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        return new String(fieldValue);
+    }
+
+    private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){
+        int flag;
+        T previous = null;
+        for(T t : list){
+            if(previous == null) previous = t;
+            else{
+                flag = comp.compare(previous,t);
+                if(flag > 0) return false;
+                previous = t;
+            }
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
new file mode 100644
index 0000000..858bba4
--- /dev/null
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/SelfDefineSortableKeyTest.java
@@ -0,0 +1,228 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.dict.NumberDictionaryForest;
+import org.apache.kylin.dict.NumberDictionaryForestBuilder;
+import org.apache.kylin.dict.StringBytesConverter;
+import org.apache.kylin.dict.TrieDictionary;
+import org.apache.kylin.dict.TrieDictionaryBuilder;
+import org.apache.kylin.dict.TrieDictionaryForest;
+import org.apache.kylin.dict.TrieDictionaryForestBuilder;
+import org.apache.kylin.engine.mr.steps.fdc2.SelfDefineSortableKey;
+import org.apache.kylin.engine.mr.steps.fdc2.TypeFlag;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public class SelfDefineSortableKeyTest {
+
+    @Test
+    public void testSortLong(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Long> longList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            longList.add(rand.nextLong());
+        }
+        longList.add(0L);
+        longList.add(0L); //test duplicate
+        longList.add(-1L); //test negative number
+        longList.add(Long.MAX_VALUE);
+        longList.add(Long.MIN_VALUE);
+
+        System.out.println("test numbers:"+longList);
+        ArrayList<String> strNumList = listToStringList(longList);
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.INTEGER_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isIntegerFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+                Long l1 = Long.parseLong(o1);
+                Long l2 = Long.parseLong(o2);
+                return l1.compareTo(l2);
+            }
+        }));
+    }
+
+    @Test
+    public void testSortDouble(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Double> doubleList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            doubleList.add(rand.nextDouble());
+        }
+        doubleList.add(0.0);
+        doubleList.add(0.0); //test duplicate
+        doubleList.add(-1.0); //test negative number
+        doubleList.add(Double.MAX_VALUE);
+        doubleList.add(-Double.MAX_VALUE);
+        //System.out.println(Double.MIN_VALUE);
+
+        System.out.println("test numbers:"+doubleList);
+        ArrayList<String> strNumList = listToStringList(doubleList);
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+                Double d1 = Double.parseDouble(o1);
+                Double d2 = Double.parseDouble(o2);
+                return d1.compareTo(d2);
+            }
+        }));
+    }
+
+    @Test
+    public void testSortNormalString(){
+        int count = 10;
+        ArrayList<String> strList = new ArrayList<>();
+        for(int i=0;i<count;i++){
+            UUID uuid = UUID.randomUUID();
+            strList.add(uuid.toString());
+        }
+        strList.add("hello");
+        strList.add("hello"); //duplicate
+        strList.add("123");
+        strList.add("");
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strList, (byte)TypeFlag.NONE_NUMERIC_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        ArrayList<String> strListAftereSort = new ArrayList<>();
+        for(SelfDefineSortableKey key : keyList){
+            String str = printKey(key);
+            strListAftereSort.add(str);
+        }
+        assertTrue(isIncreasedOrder(strListAftereSort, new Comparator<String>() {
+            @Override
+            public int compare(String o1, String o2) {
+               return o1.compareTo(o2);
+            }
+        }));
+    }
+
+    @Test
+    public void testIllegalNumber(){
+        Random rand = new Random(System.currentTimeMillis());
+        ArrayList<Double> doubleList = new ArrayList<>();
+        int count = 10;
+        for(int i=0;i<count;i++){
+            doubleList.add(rand.nextDouble());
+        }
+        doubleList.add(0.0);
+        doubleList.add(0.0); //test duplicate
+        doubleList.add(-1.0); //test negative number
+        doubleList.add(Double.MAX_VALUE);
+        doubleList.add(-Double.MAX_VALUE);
+        //System.out.println(Double.MIN_VALUE);
+
+        System.out.println("test numbers:"+doubleList);
+        ArrayList<String> strNumList = listToStringList(doubleList);
+        strNumList.add("fjaeif"); //illegal type
+        //System.out.println("test num strs list:"+strNumList);
+        ArrayList<SelfDefineSortableKey> keyList = createKeyList(strNumList, (byte)TypeFlag.DOUBLE_FAMILY_TYPE.ordinal());
+        System.out.println(keyList.get(0).isOtherNumericFamily());
+        Collections.sort(keyList);
+        for(SelfDefineSortableKey key : keyList){
+            printKey(key);
+        }
+
+    }
+
+    @Test
+    public void testEnum(){
+        TypeFlag flag = TypeFlag.DOUBLE_FAMILY_TYPE;
+        System.out.println((byte)flag.ordinal());
+        int t = (byte)flag.ordinal();
+        System.out.println(t);
+    }
+
+
+
+    private<T> ArrayList<String> listToStringList(ArrayList<T> list){
+        ArrayList<String> strList = new ArrayList<>();
+        for(T t : list){
+            System.out.println(t.toString());
+            strList.add(t.toString());
+        }
+        return strList;
+    }
+
+    private ArrayList<SelfDefineSortableKey> createKeyList(List<String> strNumList,byte typeFlag){
+        int partationId = 0;
+        ArrayList<SelfDefineSortableKey> keyList = new ArrayList<>();
+        for(String str : strNumList){
+            ByteBuffer keyBuffer = ByteBuffer.allocate(4096);
+            int offset = keyBuffer.position();
+            keyBuffer.put(Bytes.toBytes(partationId)[3]);
+            keyBuffer.put(Bytes.toBytes(str));
+            //System.out.println(Arrays.toString(keyBuffer.array()));
+            byte[] valueField = Bytes.copy(keyBuffer.array(),1,keyBuffer.position()-offset-1);
+            //System.out.println("new string:"+new String(valueField));
+            //System.out.println("arrays toString:"+Arrays.toString(valueField));
+            Text outputKey = new Text();
+            outputKey.set(keyBuffer.array(),offset,keyBuffer.position()-offset);
+            SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(typeFlag,outputKey);
+            keyList.add(sortableKey);
+        }
+        return keyList;
+    }
+
+    private String printKey(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        System.out.println("type flag:"+key.getTypeId()+" fieldValue:"+new String(fieldValue));
+        return new String(fieldValue);
+    }
+
+    private String getFieldValue(SelfDefineSortableKey key){
+        byte[] data = key.getText().getBytes();
+        byte[] fieldValue = Bytes.copy(data,1,data.length-1);
+        return new String(fieldValue);
+    }
+
+    private<T> boolean isIncreasedOrder(List<T> list, Comparator<T> comp){
+        int flag;
+        T previous = null;
+        for(T t : list){
+            if(previous == null) previous = t;
+            else{
+                flag = comp.compare(previous,t);
+                if(flag > 0) return false;
+                previous = t;
+            }
+        }
+        return true;
+    }
+
+
+}


[4/6] kylin git commit: KYLIN-2006 minor revision

Posted by li...@apache.org.
KYLIN-2006 minor revision


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/85c4ded4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/85c4ded4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/85c4ded4

Branch: refs/heads/master
Commit: 85c4ded4cc6882d661d79cfbe8bec0f7d7015ba9
Parents: 4f66783
Author: Yang Li <li...@apache.org>
Authored: Mon Nov 7 20:59:25 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:02 2016 +0800

----------------------------------------------------------------------
 .../impl/threadpool/DistributedScheduler.java   |  3 +--
 .../kylin/job/lock/DistributedJobLock.java      |  7 +++++-
 .../org/apache/kylin/job/lock/DoWatchLock.java  | 23 --------------------
 .../hbase/util/ZookeeperDistributedJobLock.java |  1 -
 4 files changed, 7 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 11709c7..17df119 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -46,7 +46,6 @@ import org.apache.kylin.job.execution.Executable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.job.lock.DoWatchLock;
 import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
@@ -209,7 +208,7 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
     }
 
     //when the segment lock released but the segment related job still running, resume the job.
-    private class DoWatchImpl implements DoWatchLock {
+    private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock {
         private String serverName;
 
         public DoWatchImpl(String serverName) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
index 5ba8426..9335e56 100644
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -21,9 +21,14 @@ package org.apache.kylin.job.lock;
 import java.util.concurrent.ExecutorService;
 
 public interface DistributedJobLock extends JobLock {
-    boolean lockWithName(String cubeName, String serverName);
+    
+    boolean lockWithName(String name, String serverName);
 
     void unlockWithName(String name);
 
     void watchLock(ExecutorService pool, DoWatchLock doWatch);
+    
+    public interface DoWatchLock {
+        void doWatch(String path, String data);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
deleted file mode 100644
index 08c13f9..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.lock;
-
-public interface DoWatchLock {
-    void doWatch(String path, String data);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index eba7a20..d8d27c5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -37,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.lock.DistributedJobLock;
-import org.apache.kylin.job.lock.DoWatchLock;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.zookeeper.CreateMode;
 import org.slf4j.Logger;


[2/6] kylin git commit: KYLIN-1851 add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey

Posted by li...@apache.org.
KYLIN-1851 add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/58584483
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/58584483
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/58584483

Branch: refs/heads/master
Commit: 5858448321113a33fcbbcc8167a0448f4a44abb1
Parents: ddec049
Author: xiefan46 <95...@qq.com>
Authored: Mon Nov 7 14:37:22 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:31:44 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/dict/ByteComparator.java   |  44 ++
 .../kylin/dict/NumberDictionaryForest.java      | 278 ++++++++
 .../dict/NumberDictionaryForestBuilder.java     |  58 ++
 .../apache/kylin/dict/TrieDictionaryForest.java | 406 ++++++++++++
 .../kylin/dict/TrieDictionaryForestBuilder.java | 125 ++++
 .../kylin/dict/TrieDictionaryForestTest.java    | 657 +++++++++++++++++++
 .../fdc2/FactDistinctColumnPartitioner2.java    |  47 ++
 .../fdc2/FactDistinctColumnsCombiner2.java      |  44 ++
 .../mr/steps/fdc2/FactDistinctColumnsJob2.java  | 149 +++++
 .../fdc2/FactDistinctColumnsMapperBase2.java    | 102 +++
 .../fdc2/FactDistinctHiveColumnsMapper2.java    | 232 +++++++
 .../mr/steps/fdc2/SelfDefineSortableKey.java    | 130 ++++
 .../kylin/engine/mr/steps/fdc2/TypeFlag.java    |  28 +
 .../mr/steps/NumberDictionaryForestTest.java    | 214 ++++++
 .../mr/steps/SelfDefineSortableKeyTest.java     | 228 +++++++
 15 files changed, 2742 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
new file mode 100644
index 0000000..74d5ec5
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import java.util.Comparator;
+
+/**
+ * Created by xiefan on 16-10-28.
+ */
+public class ByteComparator<T> implements Comparator<T> {
+    private BytesConverter<T> converter;
+
+    public ByteComparator(BytesConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    @Override
+    public int compare(T o1, T o2) {
+        //return BytesUtil.safeCompareBytes(converter.convertToBytes(o1),converter.convertToBytes(o2));
+        byte[] b1 = converter.convertToBytes(o1);
+        byte[] b2 = converter.convertToBytes(o2);
+        ByteArray ba1 = new ByteArray(b1, 0, b1.length);
+        ByteArray ba2 = new ByteArray(b2, 0, b2.length);
+        return ba1.compareTo(ba2);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
new file mode 100644
index 0000000..8caa4b6
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+
+/**
+ * Created by xiefan on 16-11-1.
+ * <p>
+ * notice:number dictionary forest currently could not handle
+ * very big or very small double and float value such as 4.9E-324
+ */
+public class NumberDictionaryForest<T> extends Dictionary<T> {
+
+    public static final int MAX_DIGITS_BEFORE_DECIMAL_POINT = 19;
+
+    // encode a number into an order preserving byte sequence
+    // for positives -- padding '0'
+    // for negatives -- '-' sign, padding '9', invert digits, and terminate by ';'
+    static class NumberBytesCodec {
+        int maxDigitsBeforeDecimalPoint;
+        byte[] buf;
+        int bufOffset;
+        int bufLen;
+
+        NumberBytesCodec(int maxDigitsBeforeDecimalPoint) {
+            this.maxDigitsBeforeDecimalPoint = maxDigitsBeforeDecimalPoint;
+            this.buf = new byte[maxDigitsBeforeDecimalPoint * 3];
+            this.bufOffset = 0;
+            this.bufLen = 0;
+        }
+
+        void encodeNumber(byte[] value, int offset, int len) {
+            if (len == 0) {
+                bufOffset = 0;
+                bufLen = 0;
+                return;
+            }
+
+            if (len > buf.length) {
+                throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Internal buffer is only " + buf.length + " bytes");
+            }
+
+            boolean negative = value[offset] == '-';
+
+            // terminate negative ';'
+            int start = buf.length - len;
+            int end = buf.length;
+            if (negative) {
+                start--;
+                end--;
+                buf[end] = ';';
+            }
+
+            // copy & find decimal point
+            int decimalPoint = end;
+            for (int i = start, j = offset; i < end; i++, j++) {
+                buf[i] = value[j];
+                if (buf[i] == '.' && i < decimalPoint) {
+                    decimalPoint = i;
+                }
+            }
+            // remove '-' sign
+            if (negative) {
+                start++;
+            }
+
+            // prepend '0'
+            int nZeroPadding = maxDigitsBeforeDecimalPoint - (decimalPoint - start);
+            if (nZeroPadding < 0 || nZeroPadding + 1 > start)
+                throw new IllegalArgumentException("Too many digits for NumberDictionary: " + Bytes.toString(value, offset, len) + ". Expect " + maxDigitsBeforeDecimalPoint + " digits before decimal point at max.");
+            for (int i = 0; i < nZeroPadding; i++) {
+                buf[--start] = '0';
+            }
+
+            // consider negative
+            if (negative) {
+                buf[--start] = '-';
+                for (int i = start + 1; i < buf.length; i++) {
+                    int c = buf[i];
+                    if (c >= '0' && c <= '9') {
+                        buf[i] = (byte) ('9' - (c - '0'));
+                    }
+                }
+            } else {
+                buf[--start] = '0';
+            }
+
+            bufOffset = start;
+            bufLen = buf.length - start;
+        }
+
+        int decodeNumber(byte[] returnValue, int offset) {
+            if (bufLen == 0) {
+                return 0;
+            }
+
+            int in = bufOffset;
+            int end = bufOffset + bufLen;
+            int out = offset;
+
+            // sign
+            boolean negative = buf[in] == '-';
+            if (negative) {
+                returnValue[out++] = '-';
+                in++;
+                end--;
+            }
+
+            // remove padding
+            byte padding = (byte) (negative ? '9' : '0');
+            for (; in < end; in++) {
+                if (buf[in] != padding)
+                    break;
+            }
+
+            // all paddings before '.', special case for '0'
+            if (in == end || !(buf[in] >= '0' && buf[in] <= '9')) {
+                returnValue[out++] = '0';
+            }
+
+            // copy the rest
+            if (negative) {
+                for (; in < end; in++, out++) {
+                    int c = buf[in];
+                    if (c >= '0' && c <= '9') {
+                        c = '9' - (c - '0');
+                    }
+                    returnValue[out] = (byte) c;
+                }
+            } else {
+                System.arraycopy(buf, in, returnValue, out, end - in);
+                out += end - in;
+            }
+
+            return out - offset;
+        }
+    }
+
+    static ThreadLocal<NumberBytesCodec> localCodec =
+            new ThreadLocal<NumberBytesCodec>();
+
+    // ============================================================================
+
+    private TrieDictionaryForest<T> dict;
+
+    private BytesConverter<T> converter;
+
+    public NumberDictionaryForest() {
+    }
+
+    public NumberDictionaryForest(TrieDictionaryForest<T> dict, BytesConverter<T> converter) {
+        this.dict = dict;
+        this.converter = converter;
+    }
+
+    protected NumberBytesCodec getCodec() {
+        NumberBytesCodec codec = localCodec.get();
+        if (codec == null) {
+            codec = new NumberBytesCodec(MAX_DIGITS_BEFORE_DECIMAL_POINT);
+            localCodec.set(codec);
+        }
+        return codec;
+    }
+
+    @Override
+    public int getMinId() {
+        return dict.getMinId();
+    }
+
+    @Override
+    public int getMaxId() {
+        return dict.getMaxId();
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return dict.getSizeOfId();
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return dict.getSizeOfValue();
+    }
+
+    @Override
+    public boolean contains(Dictionary<?> another) {
+        return dict.contains(another);
+    }
+
+    @Override
+    protected int getIdFromValueImpl(T value, int roundingFlag) {
+        if (value == null) return -1;
+        byte[] data = converter.convertToBytes(value);
+        return getIdFromValueBytesImpl(data, 0, data.length, roundingFlag);
+    }
+
+    @Override
+    protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag) {
+        NumberBytesCodec codec = getCodec();
+        codec.encodeNumber(value, offset, len);
+        return this.dict.getIdFromValueBytesImpl(codec.buf, codec.bufOffset, codec.bufLen, roundingFlag);
+    }
+
+    @Override
+    protected T getValueFromIdImpl(int id) {
+        byte[] data = getValueBytesFromIdImpl(id);
+        if (data == null) return null;
+        else return converter.convertFromBytes(data, 0, data.length);
+    }
+
+    @Override
+    protected byte[] getValueBytesFromIdImpl(int id) {
+        NumberBytesCodec codec = getCodec();
+        codec.bufOffset = 0;
+        byte[] buf = new byte[dict.getSizeOfValue()];
+        codec.bufLen = getValueBytesFromIdImpl(id, buf, 0);
+
+        if (codec.bufLen == buf.length) {
+            return buf;
+        } else {
+            byte[] result = new byte[codec.bufLen];
+            System.arraycopy(buf, 0, result, 0, codec.bufLen);
+            return result;
+        }
+    }
+
+    @Override
+    protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset) {
+        NumberBytesCodec codec = getCodec();
+        codec.bufOffset = 0;
+        codec.bufLen = this.dict.getValueBytesFromIdImpl(id, codec.buf, 0);
+        return codec.decodeNumber(returnValue, offset);
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        dict.dump(out);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        dict.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.dict = new TrieDictionaryForest<>();
+        dict.readFields(in);
+        this.converter = this.dict.getBytesConvert();
+    }
+
+    public BytesConverter<T> getConverter() {
+        return converter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
new file mode 100644
index 0000000..5444bb7
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForestBuilder.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ * Created by xiefan on 16-11-2.
+ */
+public class NumberDictionaryForestBuilder<T> {
+
+    private TrieDictionaryForestBuilder<T> trieBuilder;
+
+    private BytesConverter<T> bytesConverter;
+
+    private NumberDictionaryForest.NumberBytesCodec codec = new NumberDictionaryForest.NumberBytesCodec(
+            NumberDictionaryForest.MAX_DIGITS_BEFORE_DECIMAL_POINT);
+
+    public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+        this(bytesConverter, 0);
+    }
+
+    public NumberDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+        this.trieBuilder = new TrieDictionaryForestBuilder<T>(bytesConverter, baseId);
+        this.bytesConverter = bytesConverter;
+    }
+
+    public void addValue(T value) {
+        addValue(bytesConverter.convertToBytes(value));
+    }
+
+    public void addValue(byte[] value) {
+        codec.encodeNumber(value, 0, value.length);
+        byte[] copy = Bytes.copy(codec.buf, codec.bufOffset, codec.bufLen);
+        this.trieBuilder.addValue(copy);
+    }
+
+    public NumberDictionaryForest<T> build() {
+        TrieDictionaryForest<T> forest = trieBuilder.build();
+        return new NumberDictionaryForest<T>(forest, bytesConverter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
new file mode 100755
index 0000000..e9ccc56
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForest.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+
+/**
+ * use trie forest to optimize trie dictionary
+ * <p>
+ * the input data must in an increase order(sort by org.apache.kylin.dict.ByteComparator)
+ * <p>
+ * Created by xiefan on 16-10-26.
+ */
+public class TrieDictionaryForest<T> extends Dictionary<T> {
+
+    private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForest.class);
+
+    private ArrayList<TrieDictionary<T>> trees;
+
+    //private ArrayList<byte[]> valueDivide; //find tree
+
+    private ArrayList<ByteArray> valueDivide;
+
+    private ArrayList<Integer> accuOffset;  //find tree
+
+    private BytesConverter<T> bytesConvert;
+
+    private int baseId;
+
+    /*public AtomicLong getValueIndexTime = new AtomicLong(0);
+
+    public AtomicLong getValueTime = new AtomicLong(0);
+
+    public AtomicLong binarySearchTime = new AtomicLong(0);
+
+    public AtomicLong copyTime = new AtomicLong(0);
+
+    public AtomicLong getValueIndexTime2 = new AtomicLong(0);
+
+    public AtomicLong getValueTime2 = new AtomicLong(0);*/
+
+    public TrieDictionaryForest() { // default constructor for Writable interface
+
+    }
+
+    public TrieDictionaryForest(ArrayList<TrieDictionary<T>> trees,
+                                ArrayList<ByteArray> valueDivide, ArrayList<Integer> accuOffset, BytesConverter<T> bytesConverter, int baseId) {
+        this.trees = trees;
+        this.valueDivide = valueDivide;
+        this.accuOffset = accuOffset;
+        this.bytesConvert = bytesConverter;
+        this.baseId = baseId;
+    }
+
+
+    @Override
+    public int getMinId() {
+        if (trees.isEmpty()) return -1;
+        return trees.get(0).getMinId() + baseId;
+    }
+
+    @Override
+    public int getMaxId() {
+        if (trees.isEmpty()) return -1;
+        int index = trees.size() - 1;
+        int id = accuOffset.get(index) + trees.get(index).getMaxId() + baseId;
+        return id;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        if (trees.isEmpty()) return -1;
+        int maxOffset = accuOffset.get(accuOffset.size() - 1);
+        TrieDictionary<T> lastTree = trees.get(trees.size() - 1);
+        int sizeOfId = BytesUtil.sizeForValue(baseId + maxOffset + lastTree.getMaxId() + 1);
+        return sizeOfId;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        int maxValue = -1;
+        for (TrieDictionary<T> tree : trees)
+            maxValue = Math.max(maxValue, tree.getSizeOfValue());
+        return maxValue;
+    }
+
+    //value --> id
+    @Override
+    protected int getIdFromValueImpl(T value, int roundingFlag)
+            throws IllegalArgumentException {
+        byte[] valueBytes = bytesConvert.convertToBytes(value);
+        return getIdFromValueBytesImpl(valueBytes, 0, valueBytes.length, roundingFlag);
+    }
+
+
+    //id = tree_inner_offset + accumulate_offset + baseId
+    @Override
+    protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag)
+            throws IllegalArgumentException {
+
+        //long startTime = System.currentTimeMillis();
+        ByteArray search = new ByteArray(value, offset, len);
+        //copyTime.addAndGet(System.currentTimeMillis() - startTime);
+        int index = findIndexByValue(search);
+        //int index = findIndexByValue(value);
+        //binarySearchTime.addAndGet(System.currentTimeMillis() - startTime);
+        if (index < 0) {
+            //System.out.println("value divide:"+valueDivide.size()+" "+valueDivide);
+            throw new IllegalArgumentException("Tree Not Found. index < 0.Value:" + new String(Arrays.copyOfRange(value, offset, len)));
+        }
+        TrieDictionary<T> tree = trees.get(index);
+        //getValueIndexTime.addAndGet(System.currentTimeMillis() - startTime);
+        //startTime = System.currentTimeMillis();
+        int id = tree.getIdFromValueBytes(value, offset, len, roundingFlag);
+        id = id + accuOffset.get(index);
+        id += baseId;
+        //getValueTime.addAndGet(System.currentTimeMillis() - startTime);
+        return id;
+    }
+
+    //id --> value
+    @Override
+    protected T getValueFromIdImpl(int id) throws IllegalArgumentException {
+        //System.out.println("here");
+        byte[] data = getValueBytesFromIdImpl(id);
+        if (data != null) {
+            return bytesConvert.convertFromBytes(data, 0, data.length);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset)
+            throws IllegalArgumentException {
+        //long startTime = System.currentTimeMillis();
+        int index = findIndexById(id);
+        int treeInnerOffset = getTreeInnerOffset(id, index);
+        TrieDictionary<T> tree = trees.get(index);
+        //getValueIndexTime2.addAndGet(System.currentTimeMillis() - startTime);
+        //startTime = System.currentTimeMillis();
+        int size = tree.getValueBytesFromIdImpl(treeInnerOffset, returnValue, offset);
+        //getValueTime2.addAndGet(System.currentTimeMillis() - startTime);
+        return size;
+    }
+
+
+    @Override
+    protected byte[] getValueBytesFromIdImpl(int id) throws IllegalArgumentException {
+        int index = findIndexById(id); //lower bound
+        if (index < 0) {
+            throw new IllegalArgumentException("Tree Not Found. index < 0");
+        }
+        int treeInnerOffset = getTreeInnerOffset(id, index);
+        TrieDictionary<T> tree = trees.get(index);
+        byte[] result = tree.getValueBytesFromId(treeInnerOffset);
+        return result;
+    }
+
+
+    private int getTreeInnerOffset(int id, int index) {
+        id -= baseId;
+        id = id - accuOffset.get(index);
+        return id;
+    }
+
+    @Override
+    public void dump(PrintStream out) {
+        for (int i = 0; i < trees.size(); i++) {
+            System.out.println("----tree " + i + "--------");
+            trees.get(i).dump(out);
+        }
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        writeHead(out);
+        writeBody(out);
+    }
+
+    private void writeHead(DataOutput out) throws IOException {
+        ByteArrayOutputStream byteBuf = new ByteArrayOutputStream();
+        DataOutputStream headOut = new DataOutputStream(byteBuf);
+        headOut.writeInt(baseId);
+        headOut.writeUTF(bytesConvert == null ? "" : bytesConvert.getClass().getName());
+        //write accuOffset
+        headOut.writeInt(accuOffset.size());
+        for (int i = 0; i < accuOffset.size(); i++)
+            headOut.writeInt(accuOffset.get(i));
+        //write valueDivide
+        headOut.writeInt(valueDivide.size());
+        for (int i = 0; i < valueDivide.size(); i++) {
+            ByteArray ba = valueDivide.get(i);
+            byte[] byteStr = ba.toBytes();
+            headOut.writeInt(byteStr.length);
+            headOut.write(byteStr);
+        }
+        //write tree size
+        headOut.writeInt(trees.size());
+        headOut.close();
+        byte[] head = byteBuf.toByteArray();
+        //output
+        out.writeInt(head.length);
+        out.write(head);
+    }
+
+
+    private void writeBody(DataOutput out) throws IOException {
+        for (int i = 0; i < trees.size(); i++) {
+            TrieDictionary<T> tree = trees.get(i);
+            tree.write(out);
+        }
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        try {
+            int headSize = in.readInt();
+            this.baseId = in.readInt();
+            String converterName = in.readUTF();
+            if (converterName.isEmpty() == false)
+                this.bytesConvert = ClassUtil.forName(converterName, BytesConverter.class).newInstance();
+            //init accuOffset
+            int accuSize = in.readInt();
+            this.accuOffset = new ArrayList<>();
+            for (int i = 0; i < accuSize; i++) {
+                accuOffset.add(in.readInt());
+            }
+            //init valueDivide
+            int valueDivideSize = in.readInt();
+            this.valueDivide = new ArrayList<>();
+            for (int i = 0; i < valueDivideSize; i++) {
+                int length = in.readInt();
+                byte[] buffer = new byte[length];
+                in.readFully(buffer);
+                valueDivide.add(new ByteArray(buffer, 0, buffer.length));
+            }
+            int treeSize = in.readInt();
+            this.trees = new ArrayList<>();
+            for (int i = 0; i < treeSize; i++) {
+                TrieDictionary<T> dict = new TrieDictionary<>();
+                dict.readFields(in);
+                trees.add(dict);
+            }
+        } catch (Exception e) {
+            if (e instanceof RuntimeException)
+                throw (RuntimeException) e;
+            else
+                throw new RuntimeException(e);
+        }
+
+    }
+
+    @Override
+    public boolean contains(Dictionary other) {
+        if (other.getSize() > this.getSize()) {
+            return false;
+        }
+
+        for (int i = other.getMinId(); i <= other.getMaxId(); ++i) {
+            T v = (T) other.getValueFromId(i);
+            if (!this.containsValue(v)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public List<TrieDictionary<T>> getTrees() {
+        return Collections.unmodifiableList(this.trees);
+    }
+
+    private boolean onlyOneTree() {
+        return trees.size() == 1;
+    }
+
+    private int findIndexByValue(T value) {
+        byte[] valueBytes = bytesConvert.convertToBytes(value);
+        return findIndexByValue(new ByteArray(valueBytes, 0, valueBytes.length));
+    }
+
+    private int findIndexByValue(ByteArray value) {
+        int index = lowerBound(value, new Comparator<ByteArray>() {
+            @Override
+            public int compare(ByteArray o1, ByteArray o2) {
+                return o1.compareTo(o2);
+            }
+        }, this.valueDivide);
+        return index;
+    }
+
+    private int findIndexById(Integer id) {
+        id -= baseId;
+        int index = lowerBound(id, new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        }, this.accuOffset);
+        return index;
+    }
+
+
+    private static <K> int lowerBound(K lookfor, Comparator<K> comparator, ArrayList<K> list) {
+        if (list == null || list.isEmpty())
+            return 0; //return the first tree
+        int left = 0;
+        int right = list.size() - 1;
+        int mid = 0;
+        boolean found = false;
+        int comp = 0;
+        while (!found && left <= right) {
+            mid = left + (right - left) / 2;
+            comp = comparator.compare(lookfor, list.get(mid));
+            if (comp < 0)
+                right = mid - 1;
+            else if (comp > 0)
+                left = mid + 1;
+            else
+                found = true;
+        }
+        if (found) {
+            //System.out.println("look for:"+lookfor+" index:"+mid);
+            return mid;
+        } else {
+            //System.out.println("look for:"+lookfor+" index:"+Math.max(left,right));
+            return Math.min(left, right);  //value may be bigger than the right tree
+        }
+    }
+
+    public static void main(String[] args) {
+        /*ArrayList<Integer> list = new ArrayList<>();
+        list.add(3);
+        list.add(10);
+        list.add(15);
+        Comparator<Integer> comp = new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1.compareTo(o2);
+            }
+        };
+        int[] nums = {-1,0,1,2,3,4,13,15,16};
+        for(int i : nums){
+            System.out.println("found value:"+i+" index:"+lowerBound(i,comp,list));
+        }*/
+        ArrayList<String> list = new ArrayList<>();
+        list.add("\u4e00");
+        list.add("\u4e8c");
+        list.add("\u4e09");
+        list.add("");
+        list.add("part");
+        list.add("par");
+        list.add("partition");
+        list.add("party");
+        list.add("parties");
+        list.add("paint");
+        Collections.sort(list);
+        for (String str : list) {
+            System.out.println("found value:" + str + " index:" + lowerBound(str, new Comparator<String>() {
+                @Override
+                public int compare(String o1, String o2) {
+                    return o1.compareTo(o2);
+                }
+            }, list));
+        }
+        //System.out.println(BytesUtil.safeCompareBytes("\u4e8c".getBytes(),"\u4e09".getBytes()));
+    }
+
+    public BytesConverter<T> getBytesConvert() {
+        return bytesConvert;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
new file mode 100755
index 0000000..3c03c08
--- /dev/null
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryForestBuilder.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.dict;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+
+
+public class TrieDictionaryForestBuilder<T> {
+
+    public static int MaxTrieTreeSize = 1024 * 1024;//1M
+
+    private BytesConverter<T> bytesConverter;
+
+    private int curTreeSize = 0;
+
+    private TrieDictionaryBuilder<T> trieBuilder;
+
+    private ArrayList<TrieDictionary<T>> trees = new ArrayList<>();
+
+    private ArrayList<ByteArray> valueDivide = new ArrayList<>(); //find tree
+
+    private ArrayList<Integer> accuOffset = new ArrayList<>();  //find tree
+
+    private ByteArray previousValue = null;  //value use for remove duplicate
+
+    private static final Logger logger = LoggerFactory.getLogger(TrieDictionaryForestBuilder.class);
+
+    private int baseId;
+
+    private int curOffset;
+
+
+    public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter) {
+        this(bytesConverter, 0);
+    }
+
+    public TrieDictionaryForestBuilder(BytesConverter<T> bytesConverter, int baseId) {
+        this.bytesConverter = bytesConverter;
+        this.trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+        this.baseId = baseId;
+        curOffset = 0;
+        //stringComparator = new ByteComparator<>(new StringBytesConverter());
+    }
+
+    public void addValue(T value) {
+        if (value == null) return;
+        byte[] valueBytes = bytesConverter.convertToBytes(value);
+        addValue(new ByteArray(valueBytes, 0, valueBytes.length));
+    }
+
+    public void addValue(byte[] value) {
+        if (value == null) return;
+        ByteArray array = new ByteArray(value, 0, value.length);
+        addValue(array);
+    }
+
+    public void addValue(ByteArray value) {
+        //System.out.println("value length:"+value.length);
+        if (value == null) return;
+        if (previousValue == null) {
+            previousValue = value;
+        } else {
+            int comp = previousValue.compareTo(value);
+            if (comp == 0) return; //duplicate value
+            if (comp > 0) {
+                //logger.info("values not in ascending order");
+                //System.out.println(".");
+            }
+        }
+        this.trieBuilder.addValue(value.array());
+        previousValue = value;
+        this.curTreeSize += value.length();
+        if (curTreeSize >= MaxTrieTreeSize) {
+            TrieDictionary<T> tree = trieBuilder.build(0);
+            addTree(tree);
+            reset();
+        }
+    }
+
+    public TrieDictionaryForest<T> build() {
+        if (curTreeSize != 0) {  //last tree
+            TrieDictionary<T> tree = trieBuilder.build(0);
+            addTree(tree);
+            reset();
+        }
+        TrieDictionaryForest<T> forest = new TrieDictionaryForest<T>(this.trees,
+                this.valueDivide, this.accuOffset, this.bytesConverter, baseId);
+        return forest;
+    }
+
+    private void addTree(TrieDictionary<T> tree) {
+        trees.add(tree);
+        int minId = tree.getMinId();
+        accuOffset.add(curOffset);
+        byte[] valueBytes = tree.getValueBytesFromId(minId);
+        valueDivide.add(new ByteArray(valueBytes, 0, valueBytes.length));
+        curOffset += (tree.getMaxId() + 1);
+        //System.out.println(" curOffset:"+ curOffset);
+    }
+
+    private void reset() {
+        curTreeSize = 0;
+        trieBuilder = new TrieDictionaryBuilder<T>(bytesConverter);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
new file mode 100755
index 0000000..624d6ba
--- /dev/null
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/TrieDictionaryForestTest.java
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+
+package org.apache.kylin.dict;
+
+
+import org.apache.kylin.common.util.MemoryBudgetController;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.*;
+
+import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Created by xiefan on 16-10-26.
+ */
+
+public class TrieDictionaryForestTest {
+
+
+    @Test
+    public void testBasicFound() {
+        ArrayList<String> strs = new ArrayList<String>();
+        strs.add("part");
+        strs.add("par");
+        strs.add("partition");
+        strs.add("party");
+        strs.add("parties");
+        strs.add("paint");
+        Collections.sort(strs);
+        int baseId = 0;
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId);
+        TrieDictionaryForest<String> dict = builder.build();
+        dict.dump(System.out);
+        int expectId = baseId;
+        for (String s : strs) {
+            System.out.println("value:" + s + "  expect id:" + expectId);
+            assertEquals(expectId, dict.getIdFromValue(s));
+            expectId++;
+        }
+        System.out.println("test ok");
+    }
+
+    @Test  //one string one tree
+    public void testMultiTree() {
+        ArrayList<String> strs = new ArrayList<String>();
+        strs.add("part");
+        strs.add("par");
+        strs.add("partition");
+        strs.add("party");
+        strs.add("parties");
+        strs.add("paint");
+        strs.add("\u4e00\u4e8c\u4e09");  //Chinese test
+        strs.add("\u56db\u4e94\u516d");
+        strs.add("");
+        Collections.sort(strs, new ByteComparator<String>(new StringBytesConverter()));
+        int baseId = 5;
+        int maxTreeSize = 0;
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+        TrieDictionaryForest<String> dict = builder.build();
+        dict.dump(System.out);
+        assertEquals(strs.size(), dict.getTrees().size());
+        int expectId = baseId;
+        for (String s : strs) {
+            System.out.println("value:" + s + "  expect id:" + expectId);
+            assertEquals(expectId, dict.getIdFromValue(s));
+            expectId++;
+        }
+        System.out.println("test ok");
+    }
+
+    public void duplicateDataTest() {
+        //todo
+    }
+
+    @Test
+    public void testBigDataSet() {
+        //h=generate data
+        ArrayList<String> strs = new ArrayList<>();
+        Iterator<String> it = new RandomStrings(100 * 10000).iterator();
+        int totalSize = 0;
+        final StringBytesConverter converter = new StringBytesConverter();
+        while (it.hasNext()) {
+            String str = it.next();
+            byte[] data = converter.convertToBytes(str);
+            if (data != null) {
+                totalSize += data.length;
+            }
+            strs.add(str);
+        }
+        Collections.sort(strs);
+        int baseId = 20;
+        int maxTreeSize = totalSize / 10;
+        System.out.println("data size:" + totalSize / 1024 + "KB  max tree size:" + maxTreeSize / 1024 + "KB");
+        //create the answer set
+        Map<String, Integer> idMap = rightIdMap(baseId, strs);
+        //build tree
+        TrieDictionaryForestBuilder<String> builder = newDictBuilder(strs, baseId, maxTreeSize);
+        TrieDictionaryForest<String> dict = builder.build();
+        System.out.println("tree num:" + dict.getTrees().size());
+        //check
+        for (Map.Entry<String, Integer> entry : idMap.entrySet()) {
+            //System.out.println("my id:"+dict.getIdFromValue(entry.getKey())+" right id:"+entry.getValue());
+            assertEquals(0, dict.getIdFromValue(entry.getKey()) - entry.getValue());
+            assertEquals(entry.getKey(), dict.getValueFromId(entry.getValue()));
+        }
+    }
+
+    @Test
+    public void partOverflowTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        // str.add("");
+        str.add("part");
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        String longStr = "paintjkjdfklajkdljfkdsajklfjklsadjkjekjrklewjrklewjklrjklewjkljkljkljkljweklrjewkljrklewjrlkjewkljrkljkljkjlkjjkljkljkljkljlkjlkjlkjljdfadfads" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk"
+                + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk" + "dddddddddddddddddddddddddddddddddddddddddddddddddkfjadslkfjdsakljflksadjklfjklsjfkljwelkrjewkljrklewjklrjelkwjrklewjrlkjwkljerklkljlkjrlkwejrk";
+        System.out.println("The length of the long string is " + longStr.length());
+        str.add(longStr);
+
+        str.add("zzzzzz" + longStr);// another long string
+        int baseId = 10;
+        int maxSize = 100 * 1024 * 1024;
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, maxSize);
+        TrieDictionaryForest<String> dict = b.build();
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : str) {
+            set.add(s);
+        }
+        // test basic id<==>value
+        Iterator<String> it = set.iterator();
+        int id = 0;
+        int previousId = -1;
+        for (; it.hasNext(); id++) {
+            String value = it.next();
+
+            // in case of overflow parts, there exist interpolation nodes
+            // they exist to make sure that any node's part is shorter than 255
+            int actualId = dict.getIdFromValue(value);
+            assertTrue(actualId >= id);
+            assertTrue(actualId > previousId);
+            previousId = actualId;
+
+            assertEquals(value, dict.getValueFromId(actualId));
+        }
+    }
+
+    @Test
+    public void notFoundTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        str.add("part");
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+
+        ArrayList<String> notFound = new ArrayList<String>();
+        notFound.add("");
+        notFound.add("p");
+        notFound.add("pa");
+        notFound.add("pb");
+        notFound.add("parti");
+        notFound.add("partz");
+        notFound.add("partyz");
+
+        testStringDictionary(str, notFound);
+    }
+
+
+    @Test
+    public void dictionaryContainTest() {
+        ArrayList<String> str = new ArrayList<String>();
+        str.add("part");
+        str.add("part"); // meant to be dup
+        str.add("par");
+        str.add("partition");
+        str.add("party");
+        str.add("parties");
+        str.add("paint");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        int baseId = new Random().nextInt(100);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId);
+        TrieDictionaryForest<String> dict = b.build();
+        str.add("py");
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        b = newDictBuilder(str, baseId);
+        baseId = new Random().nextInt(100);
+        TrieDictionaryForest<String> dict2 = b.build();
+
+        assertEquals(true, dict2.contains(dict));
+        assertEquals(false, dict.contains(dict2));
+    }
+
+    @Test
+    public void englishWordsTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/english-words.80 (scowl-2015.05.18).txt");
+        ArrayList<String> str = loadStrings(is);
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        testStringDictionary(str, null);
+    }
+
+    @Test
+    public void categoryNamesTest() throws Exception {
+        InputStream is = new FileInputStream("src/test/resources/dict/dw_category_grouping_names.dat");
+        ArrayList<String> str = loadStrings(is);
+        Collections.sort(str, new ByteComparator<String>(new StringBytesConverter()));
+        testStringDictionary(str, null);
+    }
+
+    @Test
+    public void serializeTest() {
+        ArrayList<String> testData = getTestData(10);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 10, 0);
+        TrieDictionaryForest<String> dict = b.build();
+        dict = testSerialize(dict);
+        dict.dump(System.out);
+        for (String str : testData) {
+            assertEquals(str, dict.getValueFromId(dict.getIdFromValue(str)));
+        }
+    }
+
+
+    private static TrieDictionaryForest<String> testSerialize(TrieDictionaryForest<String> dict) {
+        try {
+            ByteArrayOutputStream bout = new ByteArrayOutputStream();
+            DataOutputStream dataout = new DataOutputStream(bout);
+            dict.write(dataout);
+            dataout.close();
+            ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());
+            DataInputStream datain = new DataInputStream(bin);
+            TrieDictionaryForest<String> r = new TrieDictionaryForest<>();
+            //r.dump(System.out);
+            r.readFields(datain);
+            //r.dump(System.out);
+            datain.close();
+            return r;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /*@Test
+    public void getIdFromValueBytesTest() throws Exception{
+        String value = "\u4e00\u4e8c\u4e09";
+        BytesConverter<String> converter = new StringBytesConverter();
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<>(converter,0);
+        b.addValue(value);
+        TrieDictionaryForest<String> dict = b.build();
+        dict.dump(System.out);
+        byte[] data = converter.convertToBytes(value);
+        int id = dict.getIdFromValueBytes(data,0,data.length);
+
+    }*/
+
+    //benchmark
+    @Deprecated
+    public void memoryUsageBenchmarkTest() throws Exception {
+        //create data
+        ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+        int testTimes = 1;
+        System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+        System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+        for (int i = 0; i < testTimes; i++) {
+            long start = MemoryBudgetController.gcAndGetSystemAvailMB();
+            TrieDictionaryBuilder<String> b = new TrieDictionaryBuilder<>(new StringBytesConverter());
+            for (String str : testData)
+                b.addValue(str);
+            long end = MemoryBudgetController.gcAndGetSystemAvailMB();
+            System.out.println("object trie memory usage:" + (end - start) + "MB");
+            System.out.println("start memory:" + Runtime.getRuntime().maxMemory());
+            System.out.println("start memory:" + Runtime.getRuntime().totalMemory());
+            /*System.out.println(b == null);
+            startMemUse = getSystemCurUsedMemory();
+            TrieDictionary<String> dict = b.build(0);
+            memUse = getSystemCurUsedMemory();
+            System.out.println("array trie memory usage:"+(memUse-startMemUse)/(1024*1024)+"MB");
+            System.out.println(b == null );
+            System.out.println(dict == null);*/
+        }
+
+
+    }
+
+    @Deprecated
+    private long getSystemCurUsedMemory() throws Exception {
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        long totalMem = Runtime.getRuntime().totalMemory();
+        long useMem = totalMem - Runtime.getRuntime().freeMemory();
+        return useMem;
+    }
+
+    //@Test
+    public void buildTimeBenchmarkTest() throws Exception {
+        //create data
+        ArrayList<String> testData = getTestData((int) (Integer.MAX_VALUE * 0.8 / 640));
+        //build time compare
+        int testTimes = 5;
+        long oldDictTotalBuildTime = 0;
+        long newDictTotalBuildTime = 0;
+
+        //old dict
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        for (int i = 0; i < testTimes; i++) {
+            int keep = 0;
+            long startTime = System.currentTimeMillis();
+            TrieDictionaryBuilder<String> oldTrieBuilder = new TrieDictionaryBuilder<>(new StringBytesConverter());
+            for (String str : testData)
+                oldTrieBuilder.addValue(str);
+            TrieDictionary<String> oldDict = oldTrieBuilder.build(0);
+            keep |= oldDict.getIdFromValue(testData.get(0));
+            oldDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+            System.out.println("times:" + i);
+        }
+
+        //new dict
+        System.gc();
+        Thread.currentThread().sleep(1000);
+        for (int i = 0; i < testTimes; i++) {
+            int keep = 0;
+            long startTime = System.currentTimeMillis();
+            BytesConverter<String> converter = new StringBytesConverter();
+            TrieDictionaryForestBuilder<String> newTrieBuilder = new TrieDictionaryForestBuilder<String>(converter, 0);
+            for (String str : testData)
+                newTrieBuilder.addValue(str);
+            TrieDictionaryForest<String> newDict = newTrieBuilder.build();
+            keep |= newDict.getIdFromValue(testData.get(0));
+            newDictTotalBuildTime += (System.currentTimeMillis() - startTime);
+            System.out.println("times:" + i);
+        }
+
+
+        System.out.println("compare build time.  Old trie : " + oldDictTotalBuildTime / 1000.0 + "s.New trie : " + newDictTotalBuildTime / 1000.0 + "s");
+    }
+
+
+    @Test
+    public void queryTimeBenchmarkTest() throws Exception {
+        int count = (int) (Integer.MAX_VALUE * 0.8 / 640);
+        //int count = (int) (2);
+        benchmarkStringDictionary(new RandomStrings(count));
+    }
+
+
+    private void evaluateDataSize(ArrayList<String> list) {
+        long size = 0;
+        for (String str : list)
+            size += str.getBytes().length;
+        System.out.println("test data size : " + size / (1024 * 1024) + " MB");
+    }
+
+    private void evaluateDataSize(int count) {
+        RandomStrings rs = new RandomStrings(count);
+        Iterator<String> itr = rs.iterator();
+        long bytesCount = 0;
+        while (itr.hasNext())
+            bytesCount += itr.next().getBytes().length;
+        System.out.println("test data size : " + bytesCount / (1024 * 1024) + " MB");
+    }
+
+    private static void benchmarkStringDictionary(Iterable<String> str) throws IOException {
+        //System.out.println("test values:");
+        Iterator<String> itr = str.iterator();
+        ArrayList<String> testData = new ArrayList<>();
+        while (itr.hasNext())
+            testData.add(itr.next());
+        Collections.sort(testData);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(testData, 0);
+        TrieDictionaryForest<String> dict = b.build();
+        System.out.println("tree size:" + dict.getTrees().size());
+        BytesConverter<String> converter = new StringBytesConverter();
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : testData) {
+            set.add(s);
+        }
+        //System.out.println("print set");
+        //System.out.println(set);
+        //dict.dump(System.out);
+        // prepare id==>value array and value==>id map
+        HashMap<String, Integer> map = new HashMap<String, Integer>();
+        String[] strArray = new String[set.size()];
+        byte[][] array = new byte[set.size()][];
+        Iterator<String> it = set.iterator();
+        for (int id = 0; it.hasNext(); id++) {
+            String value = it.next();
+            map.put(value, id);
+            strArray[id] = value;
+            //array[id] = value.getBytes("UTF-8");
+            array[id] = converter.convertToBytes(value);
+        }
+
+
+        // System.out.println("Dict size in bytes:  " +
+        //MemoryUtil.deepMemoryUsageOf(dict));
+        // System.out.println("Map size in bytes:   " +
+        // MemoryUtil.deepMemoryUsageOf(map));
+        // System.out.println("Array size in bytes: " +
+        // MemoryUtil.deepMemoryUsageOf(strArray));
+
+        // warm-up, said that code only got JIT after run 1k-10k times,
+        // following jvm options may help
+        // -XX:CompileThreshold=1500
+        // -XX:+PrintCompilation
+        System.out.println("Benchmark awaitig...");
+        benchmark("Warm up", dict, set, map, strArray, array);
+        benchmark("Benchmark", dict, set, map, strArray, array);
+    }
+
+    private static int benchmark(String msg, TrieDictionaryForest<String> dict, TreeSet<String> set, HashMap<String, Integer> map, String[] strArray, byte[][] array) {
+        int n = set.size();
+        int times = Math.max(10 * 1000 * 1000 / n, 1); // run 10 million lookups
+        int keep = 0; // make sure JIT don't OPT OUT function calls under test
+        byte[] valueBytes = new byte[dict.getSizeOfValue()];
+        long start;
+
+        // benchmark value==>id, via HashMap
+        System.out.println(msg + " HashMap lookup value==>id");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= map.get(strArray[j]);
+            }
+        }
+        long timeValueToIdByMap = System.currentTimeMillis() - start;
+        System.out.println(timeValueToIdByMap);
+
+        // benchmark value==>id, via Dict
+        System.out.println(msg + " Dictionary lookup value==>id");
+        //dict.dump(System.out);
+
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                //System.out.println("looking for value:"+new String(array[j]));
+                keep |= dict.getIdFromValueBytes(array[j], 0, array[j].length);
+            }
+        }
+        long timeValueToIdByDict = System.currentTimeMillis() - start;
+        System.out.println(timeValueToIdByDict);
+        /*System.out.println("detail time.  get index time"+dict.getValueIndexTime.get()+" get value time"+
+        dict.getValueTime.get() +"  binary search time:"+dict.binarySearchTime.get() + " copy time:"+
+        dict.copyTime.get());*/
+
+        // benchmark id==>value, via Array
+        System.out.println(msg + " Array lookup id==>value");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= strArray[j].length();
+            }
+        }
+        long timeIdToValueByArray = System.currentTimeMillis() - start;
+        System.out.println(timeIdToValueByArray);
+
+        // benchmark id==>value, via Dict
+        System.out.println(msg + " Dictionary lookup id==>value");
+        start = System.currentTimeMillis();
+        for (int i = 0; i < times; i++) {
+            for (int j = 0; j < n; j++) {
+                keep |= dict.getValueBytesFromId(j, valueBytes, 0);
+            }
+        }
+        long timeIdToValueByDict = System.currentTimeMillis() - start;
+        System.out.println(timeIdToValueByDict);
+        /*System.out.println("detail time.  get index time"+dict.getValueIndexTime2.get()+" get value time"+
+                dict.getValueTime2.get());*/
+
+        return keep;
+    }
+
+    private static void testStringDictionary(ArrayList<String> str, ArrayList<String> notFound) {
+        int baseId = new Random().nextInt(100);
+        TrieDictionaryForestBuilder<String> b = newDictBuilder(str, baseId, 2);
+        TrieDictionaryForest<String> dict = b.build();
+        //dict.dump(System.out);
+        TreeSet<String> set = new TreeSet<String>();
+        for (String s : str) {
+            set.add(s);
+        }
+
+        // test serialize
+        //dict = testSerialize(dict);
+
+        // test basic id<==>value
+        Iterator<String> it = set.iterator();
+        int id = baseId;
+        for (; it.hasNext(); id++) {
+            String value = it.next();
+            // System.out.println("checking " + id + " <==> " + value);
+
+            assertEquals(id, dict.getIdFromValue(value));
+            assertEquals(value, dict.getValueFromId(id));
+        }
+
+        //test not found value
+        if (notFound != null) {
+            for (String s : notFound) {
+                try {
+                    int nullId = dict.getIdFromValue(s);
+                    System.out.println("null value id:" + nullId);
+                    fail("For not found value '" + s + "', IllegalArgumentException is expected");
+                } catch (IllegalArgumentException e) {
+                    // good
+                }
+            }
+        }
+        int maxId = dict.getMaxId();
+        int[] notExistIds = {-10, -20, -Integer.MIN_VALUE, -Integer.MAX_VALUE, maxId + 1, maxId + 2};
+        for (Integer i : notExistIds) {
+            try {
+                dict.getValueFromId(i);
+                fail("For not found id '" + i + "', IllegalArgumentException is expected");
+            } catch (IllegalArgumentException e) {
+                // good
+            }
+        }
+
+        // test null value
+        int nullId = dict.getIdFromValue(null);
+        assertNull(dict.getValueFromId(nullId));
+        int nullId2 = dict.getIdFromValueBytes(null, 0, 0);
+        assertEquals(dict.getValueBytesFromId(nullId2, null, 0), -1);
+        assertEquals(nullId, nullId2);
+    }
+
+    private Map<String, Integer> rightIdMap(int baseId, ArrayList<String> strs) {
+        Map<String, Integer> result = new HashMap<>();
+        int expectId = baseId;
+        for (String str : strs) {
+            result.put(str, expectId);
+            expectId++;
+        }
+        return result;
+    }
+
+    private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId) {
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+        for (String s : strs)
+            b.addValue(s);
+        return b;
+    }
+
+    private static TrieDictionaryForestBuilder<String> newDictBuilder(Iterable<String> strs, int baseId, int treeSize) {
+        TrieDictionaryForestBuilder<String> b = new TrieDictionaryForestBuilder<String>(new StringBytesConverter(), baseId);
+        TrieDictionaryForestBuilder.MaxTrieTreeSize = treeSize;
+        for (String s : strs)
+            b.addValue(s);
+        return b;
+    }
+
+    private static class RandomStrings implements Iterable<String> {
+        final private int size;
+
+        public RandomStrings(int size) {
+            this.size = size;
+            //System.out.println("size = " + size);
+        }
+
+        @Override
+        public Iterator<String> iterator() {
+            return new Iterator<String>() {
+                Random rand = new Random(System.currentTimeMillis());
+                int i = 0;
+
+                @Override
+                public boolean hasNext() {
+                    return i < size;
+                }
+
+                @Override
+                public String next() {
+                    if (hasNext() == false)
+                        throw new NoSuchElementException();
+
+                    i++;
+                    //if (i % 1000000 == 0)
+                    //System.out.println(i);
+
+                    return nextString();
+                }
+
+                private String nextString() {
+                    StringBuffer buf = new StringBuffer();
+                    for (int i = 0; i < 64; i++) {
+                        int v = rand.nextInt(16);
+                        char c;
+                        if (v >= 0 && v <= 9)
+                            c = (char) ('0' + v);
+                        else
+                            c = (char) ('a' + v - 10);
+                        buf.append(c);
+                    }
+                    return buf.toString();
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+    private static ArrayList<String> loadStrings(InputStream is) throws Exception {
+        ArrayList<String> r = new ArrayList<String>();
+        BufferedReader reader = new BufferedReader(new InputStreamReader(is, "UTF-8"));
+        try {
+            String word;
+            while ((word = reader.readLine()) != null) {
+                word = word.trim();
+                if (word.isEmpty() == false)
+                    r.add(word);
+            }
+        } finally {
+            reader.close();
+            is.close();
+        }
+        return r;
+    }
+
+
+    private ArrayList<String> getTestData(int count) {
+        RandomStrings rs = new RandomStrings(count);
+        Iterator<String> itr = rs.iterator();
+        ArrayList<String> testData = new ArrayList<>();
+        while (itr.hasNext())
+            testData.add(itr.next());
+        Collections.sort(testData, new ByteComparator<String>(new StringBytesConverter()));
+        evaluateDataSize(testData);
+        return testData;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
new file mode 100644
index 0000000..dfc6b2c
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnPartitioner2.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class FactDistinctColumnPartitioner2 extends Partitioner<SelfDefineSortableKey, Text> {
+    private Configuration conf;
+
+    @Override
+    public int getPartition(SelfDefineSortableKey key, Text value, int numReduceTasks) {
+
+        if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_HLL) {
+            // the last reducer is for merging hll
+            return numReduceTasks - 1;
+        } else if (key.getText().getBytes()[0] == FactDistinctHiveColumnsMapper2.MARK_FOR_PARTITION_COL) {
+            // the last reducer is for merging hll
+            return numReduceTasks - 2;
+        } else {
+            int colIndex = BytesUtil.readUnsigned(key.getText().getBytes(), 0, 1);
+            return colIndex;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
new file mode 100644
index 0000000..6ff07f0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsCombiner2.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+
+import java.io.IOException;
+
+/**
+ * @author yangli9
+ */
+public class FactDistinctColumnsCombiner2 extends KylinReducer<SelfDefineSortableKey, Text, Text, Text> {
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        super.bindCurrentConfiguration(context.getConfiguration());
+    }
+
+    @Override
+    public void reduce(SelfDefineSortableKey key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
+
+        // for hll, each key only has one output, no need to do local combine;
+        // for normal col, values are empty text
+        context.write(key.getText(), values.iterator().next());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
new file mode 100644
index 0000000..4d26402
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsJob2.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsJob2 extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnsJob2.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_STATISTICS_ENABLED);
+            options.addOption(OPTION_STATISTICS_OUTPUT);
+            options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT);
+            parseOptions(options, args);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+            String statistics_enabled = getOptionValue(OPTION_STATISTICS_ENABLED);
+            String statistics_output = getOptionValue(OPTION_STATISTICS_OUTPUT);
+            String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT);
+
+            // ----------------------------------------------------------------------------
+            // add metadata to distributed cache
+            CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            List<TblColRef> columnsNeedDict = cubeMgr.getAllDictColumnsOnFact(cube.getDescriptor());
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_ENABLED, statistics_enabled);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_OUTPUT, statistics_output);
+            job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent);
+            logger.info("Starting: " + job.getJobName());
+            logger.info("using FactDistinctColumnsJob2");
+
+            setJobClasspath(job, cube.getConfig());
+
+            CubeSegment segment = cube.getSegmentById(segmentID);
+            if (segment == null) {
+                logger.error("Failed to find {} in cube {}", segmentID, cube);
+                System.out.println("Failed to find {} in cube {} " + segmentID + "," + cube);
+                for (CubeSegment s : cube.getSegments()) {
+                    logger.error(s.getName() + " with status " + s.getStatus());
+                    System.out.println(s.getName() + " with status " + s.getStatus());
+                }
+                throw new IllegalStateException();
+            } else {
+                logger.info("Found segment: " + segment);
+                System.out.println("Found segment " + segment);
+            }
+            setupMapper(cube.getSegmentById(segmentID));
+            setupReducer(output, "true".equalsIgnoreCase(statistics_enabled) ? columnsNeedDict.size() + 2 : columnsNeedDict.size());
+
+            attachKylinPropsAndMetadata(cube, job.getConfiguration());
+
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+
+    }
+
+    private void setupMapper(CubeSegment cubeSeg) throws IOException {
+        IMRTableInputFormat flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+        flatTableInputFormat.configureJob(job);
+
+        job.setMapperClass(FactDistinctHiveColumnsMapper2.class);
+        job.setCombinerClass(FactDistinctColumnsCombiner2.class);
+        job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+        job.setMapOutputValueClass(Text.class);
+    }
+
+    private void setupReducer(Path output, int numberOfReducers) throws IOException {
+        job.setReducerClass(FactDistinctColumnsReducer.class);  //reducer do not need to change
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        job.setOutputKeyClass(NullWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setPartitionerClass(FactDistinctColumnPartitioner2.class);
+        job.setNumReduceTasks(numberOfReducers);
+
+        FileOutputFormat.setOutputPath(job, output);
+        job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+        deletePath(job.getConfiguration(), output);
+    }
+
+    public static void main(String[] args) throws Exception {
+        FactDistinctColumnsJob2 job = new FactDistinctColumnsJob2();
+        int exitCode = ToolRunner.run(job, args);
+        System.exit(exitCode);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
new file mode 100644
index 0000000..2e9a2dc
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctColumnsMapperBase2.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps.fdc2;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ */
+public class FactDistinctColumnsMapperBase2<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VALUEIN, SelfDefineSortableKey, Text> {
+
+    protected String cubeName;
+    protected CubeInstance cube;
+    protected CubeSegment cubeSeg;
+    protected CubeDesc cubeDesc;
+    protected long baseCuboidId;
+    protected List<TblColRef> factDictCols;
+    protected IMRTableInputFormat flatTableInputFormat;
+
+    protected Text outputKey = new Text();
+    protected SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+    protected Text outputValue = new Text();
+    protected int errorRecordCounter = 0;
+
+    protected CubeJoinedFlatTableEnrich intermediateTableDesc;
+    protected int[] dictionaryColumnIndex;
+
+    @Override
+    protected void setup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeSeg = cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+        cubeDesc = cube.getDescriptor();
+        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+
+        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+        intermediateTableDesc = new CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg),  cubeDesc);
+        dictionaryColumnIndex = new int[factDictCols.size()];
+        for (int i = 0; i < factDictCols.size(); i++) {
+            TblColRef colRef = factDictCols.get(i);
+            int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+            dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
+        }
+
+    }
+
+    protected void handleErrorRecord(String[] record, Exception ex) throws IOException {
+
+        System.err.println("Insane record: " + Arrays.toString(record));
+        ex.printStackTrace(System.err);
+
+        errorRecordCounter++;
+        if (errorRecordCounter > BatchConstants.ERROR_RECORD_LOG_THRESHOLD) {
+            if (ex instanceof IOException)
+                throw (IOException) ex;
+            else if (ex instanceof RuntimeException)
+                throw (RuntimeException) ex;
+            else
+                throw new RuntimeException("", ex);
+        }
+    }
+}


[3/6] kylin git commit: KYLIN-2006 Make job engine distributed and HA

Posted by li...@apache.org.
KYLIN-2006 Make job engine distributed and HA

Signed-off-by: Yang Li <li...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f66783e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f66783e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f66783e

Branch: refs/heads/master
Commit: 4f66783e1e4a765080e26e19cc0fe53a78ca599a
Parents: 5858448
Author: kangkaisen <ka...@live.com>
Authored: Mon Sep 5 20:15:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:01 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   1 +
 .../kylin/job/execution/ExecutableManager.java  |  23 ++
 .../impl/threadpool/DistributedScheduler.java   | 349 +++++++++++++++++++
 .../kylin/job/lock/DistributedJobLock.java      |  29 ++
 .../org/apache/kylin/job/lock/DoWatchLock.java  |  23 ++
 .../kylin/job/BaseTestDistributedScheduler.java | 226 ++++++++++++
 .../apache/kylin/job/ContextTestExecutable.java |  51 +++
 .../job/ITDistributedSchedulerBaseTest.java     |  90 +++++
 .../job/ITDistributedSchedulerTakeOverTest.java |  60 ++++
 .../kylin/rest/controller/JobController.java    |  62 +---
 .../apache/kylin/rest/service/CubeService.java  |   4 +
 .../apache/kylin/rest/service/JobService.java   |  96 ++++-
 .../hbase/util/ZookeeperDistributedJobLock.java | 230 ++++++++++++
 13 files changed, 1182 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6d3e807..ee9f57c 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -755,6 +755,7 @@ abstract public class KylinConfigBase implements Serializable {
     public Map<Integer, String> getSchedulers() {
         Map<Integer, String> r = convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler."));
         r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler");
+        r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler");
         return r;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 0901443..92fc8c9 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -235,11 +235,30 @@ public class ExecutableManager {
         }
     }
 
+    public void resumeRunningJobForce(String jobId) {
+        AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+
+        if (job instanceof DefaultChainedExecutable) {
+            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
+            for (AbstractExecutable task : tasks) {
+                if (task.getStatus() == ExecutableState.RUNNING) {
+                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
+                    break;
+                }
+            }
+        }
+        updateJobOutput(jobId, ExecutableState.READY, null, null);
+    }
+
     public void resumeJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
         if (job == null) {
             return;
         }
+
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {
@@ -254,6 +273,10 @@ public class ExecutableManager {
 
     public void discardJob(String jobId) {
         AbstractExecutable job = getJob(jobId);
+        if (job == null) {
+            return;
+        }
+
         if (job instanceof DefaultChainedExecutable) {
             List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
             for (AbstractExecutable task : tasks) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
new file mode 100644
index 0000000..11709c7
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.impl.threadpool;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.Scheduler;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.exception.SchedulerException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DoWatchLock;
+import org.apache.kylin.job.lock.JobLock;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+/**
+ * schedule the cubing jobs when several job server running with the same metadata.
+ *
+ * to enable the distributed job server, you need to set and update three configs in the kylin.properties:
+ *  1. kylin.enable.scheduler=2
+ *  2. kylin.job.controller.lock=org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock
+ *  3. add all the job servers and query servers to the kylin.rest.servers
+ */
+public class DistributedScheduler implements Scheduler<AbstractExecutable>, ConnectionStateListener {
+    private ExecutableManager executableManager;
+    private FetcherRunner fetcher;
+    private ScheduledExecutorService fetcherPool;
+    private ExecutorService watchPool;
+    private ExecutorService jobPool;
+    private DefaultContext context;
+    private DistributedJobLock jobLock;
+
+    private static final Logger logger = LoggerFactory.getLogger(DistributedScheduler.class);
+    private static final ConcurrentHashMap<KylinConfig, DistributedScheduler> CACHE = new ConcurrentHashMap<KylinConfig, DistributedScheduler>();
+    //keep all segments having running job
+    private final Set<String> segmentWithLocks = new CopyOnWriteArraySet<>();
+    private volatile boolean initialized = false;
+    private volatile boolean hasStarted = false;
+    private JobEngineConfig jobEngineConfig;
+
+    private final static String SEGMENT_ID = "segmentId";
+
+    //only for it test
+    public static DistributedScheduler getInstance(KylinConfig config) {
+        DistributedScheduler r = CACHE.get(config);
+        if (r == null) {
+            synchronized (DistributedScheduler.class) {
+                r = CACHE.get(config);
+                if (r == null) {
+                    r = new DistributedScheduler();
+                    CACHE.put(config, r);
+                    if (CACHE.size() > 1) {
+                        logger.warn("More than one singleton exist");
+                    }
+                }
+            }
+        }
+        return r;
+    }
+
+    private class FetcherRunner implements Runnable {
+        @Override
+        synchronized public void run() {
+            try {
+                Map<String, Executable> runningJobs = context.getRunningJobs();
+                if (runningJobs.size() >= jobEngineConfig.getMaxConcurrentJobLimit()) {
+                    logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time");
+                    return;
+                }
+
+                int nRunning = 0, nOtherRunning = 0, nReady = 0, nOthers = 0;
+                for (final String id : executableManager.getAllJobIds()) {
+                    if (runningJobs.containsKey(id)) {
+                        nRunning++;
+                        continue;
+                    }
+
+                    final Output output = executableManager.getOutput(id);
+
+                    if ((output.getState() != ExecutableState.READY)) {
+                        if (output.getState() == ExecutableState.RUNNING) {
+                            nOtherRunning++;
+                        } else {
+                            nOthers++;
+                        }
+                        continue;
+                    }
+
+                    nReady++;
+                    final AbstractExecutable executable = executableManager.getJob(id);
+                    try {
+                        jobPool.execute(new JobRunner(executable));
+                    } catch (Exception ex) {
+                        logger.warn(executable.toString() + " fail to schedule in server: " + serverName, ex);
+                    }
+                }
+                logger.info("Job Fetcher: " + nRunning + " should running, " + runningJobs.size() + " actual running, " + nOtherRunning + " running in other server, " + nReady + " ready, " + nOthers + " others");
+            } catch (Exception e) {
+                logger.warn("Job Fetcher caught a exception " + e);
+            }
+        }
+    }
+
+    private String serverName = getServerName();
+
+    private String getServerName() {
+        String serverName = null;
+        try {
+            serverName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            logger.error("fail to get the serverName");
+        }
+        return serverName;
+    }
+
+    //only for it test
+    public void setServerName(String serverName) {
+        this.serverName = serverName;
+        logger.info("serverName update to:" + this.serverName);
+    }
+
+    private class JobRunner implements Runnable {
+
+        private final AbstractExecutable executable;
+
+        public JobRunner(AbstractExecutable executable) {
+            this.executable = executable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                String segmentId = executable.getParam(SEGMENT_ID);
+                if (jobLock.lockWithName(segmentId, serverName)) {
+                    logger.info(executable.toString() + " scheduled in server: " + serverName);
+
+                    context.addRunningJob(executable);
+                    segmentWithLocks.add(segmentId);
+                    executable.execute(context);
+                }
+            } catch (ExecuteException e) {
+                logger.error("ExecuteException job:" + executable.getId() + " in server: " + serverName, e);
+            } catch (Exception e) {
+                logger.error("unknown error execute job:" + executable.getId() + " in server: " + serverName, e);
+            } finally {
+                context.removeRunningJob(executable);
+                releaseJobLock(executable);
+                // trigger the next step asap
+                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+            }
+        }
+
+        //release job lock only when the all tasks of the job finish and the job server keep the cube lock.
+        private void releaseJobLock(AbstractExecutable executable) {
+            if (executable instanceof DefaultChainedExecutable) {
+                String segmentId = executable.getParam(SEGMENT_ID);
+                ExecutableState state = executable.getStatus();
+
+                if (state == ExecutableState.SUCCEED || state == ExecutableState.ERROR || state == ExecutableState.DISCARDED) {
+                    if (segmentWithLocks.contains(segmentId)) {
+                        logger.info(executable.toString() + " will release the lock for the segment: " + segmentId);
+                        jobLock.unlockWithName(segmentId);
+                        segmentWithLocks.remove(segmentId);
+                    }
+                }
+            }
+        }
+    }
+
+    //when the segment lock released but the segment related job still running, resume the job.
+    private class DoWatchImpl implements DoWatchLock {
+        private String serverName;
+
+        public DoWatchImpl(String serverName) {
+            this.serverName = serverName;
+        }
+
+        @Override
+        public void doWatch(String path, String nodeData) {
+            String[] paths = path.split("/");
+            String segmentId = paths[paths.length - 1];
+
+            for (final String id : executableManager.getAllJobIds()) {
+                final Output output = executableManager.getOutput(id);
+                if (output.getState() == ExecutableState.RUNNING) {
+                    AbstractExecutable executable = executableManager.getJob(id);
+                    if (executable instanceof DefaultChainedExecutable && executable.getParams().get(SEGMENT_ID).equalsIgnoreCase(segmentId) && !nodeData.equalsIgnoreCase(serverName)) {
+                        try {
+                            logger.warn(nodeData + " has released the lock for: " + segmentId + " but the job still running. so " + serverName + " resume the job");
+                            if (jobLock.lockWithName(segmentId, serverName)) {
+                                executableManager.resumeRunningJobForce(executable.getId());
+                                fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+                                break;
+                            }
+                        } catch (Exception e) {
+                            logger.error("resume the job but fail in server: " + serverName, e);
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+
+    @Override
+    public void stateChanged(CuratorFramework client, ConnectionState newState) {
+        if ((newState == ConnectionState.SUSPENDED) || (newState == ConnectionState.LOST)) {
+            try {
+                shutdown();
+            } catch (SchedulerException e) {
+                throw new RuntimeException("failed to shutdown scheduler", e);
+            }
+        }
+    }
+
+    @Override
+    public synchronized void init(JobEngineConfig jobEngineConfig, final JobLock jobLock) throws SchedulerException {
+        String serverMode = jobEngineConfig.getConfig().getServerMode();
+        if (!("job".equals(serverMode.toLowerCase()) || "all".equals(serverMode.toLowerCase()))) {
+            logger.info("server mode: " + serverMode + ", no need to run job scheduler");
+            return;
+        }
+        logger.info("Initializing Job Engine ....");
+
+        if (!initialized) {
+            initialized = true;
+        } else {
+            return;
+        }
+
+        this.jobEngineConfig = jobEngineConfig;
+        this.jobLock = (DistributedJobLock) jobLock;
+
+        executableManager = ExecutableManager.getInstance(jobEngineConfig.getConfig());
+        //load all executable, set them to a consistent status
+        fetcherPool = Executors.newScheduledThreadPool(1);
+
+        //watch the zookeeper node change, so that when one job server is down, other job servers can take over.
+        watchPool = Executors.newFixedThreadPool(1);
+        DoWatchImpl doWatchImpl = new DoWatchImpl(this.serverName);
+        this.jobLock.watchLock(watchPool, doWatchImpl);
+
+        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
+        jobPool = new ThreadPoolExecutor(corePoolSize, corePoolSize, Long.MAX_VALUE, TimeUnit.DAYS, new SynchronousQueue<Runnable>());
+        context = new DefaultContext(Maps.<String, Executable> newConcurrentMap(), jobEngineConfig.getConfig());
+
+        resumeAllRunningJobs();
+
+        fetcher = new FetcherRunner();
+        fetcherPool.scheduleAtFixedRate(fetcher, 10, ExecutableConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
+        hasStarted = true;
+    }
+
+    private void resumeAllRunningJobs() {
+        for (final String id : executableManager.getAllJobIds()) {
+            final Output output = executableManager.getOutput(id);
+            AbstractExecutable executable = executableManager.getJob(id);
+            if (output.getState() == ExecutableState.RUNNING && executable instanceof DefaultChainedExecutable) {
+                try {
+                    if (jobLock.lockWithName(executable.getParam(SEGMENT_ID), serverName)) {
+                        executableManager.resumeRunningJobForce(executable.getId());
+                        fetcherPool.schedule(fetcher, 0, TimeUnit.SECONDS);
+                    }
+                } catch (Exception e) {
+                    logger.error("resume the job " + id + " fail in server: " + serverName, e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void shutdown() throws SchedulerException {
+        logger.info("Will shut down Job Engine ....");
+
+        releaseAllLocks();
+        logger.info("The all locks has released");
+
+        watchPool.shutdown();
+        logger.info("The watchPool has down");
+
+        fetcherPool.shutdown();
+        logger.info("The fetcherPool has down");
+
+        jobPool.shutdown();
+        logger.info("The jobPoll has down");
+    }
+
+    private void releaseAllLocks() {
+        for (String segmentId : segmentWithLocks) {
+            jobLock.unlockWithName(segmentId);
+        }
+    }
+
+    @Override
+    public boolean stop(AbstractExecutable executable) throws SchedulerException {
+        if (hasStarted) {
+            return true;
+        } else {
+            //TODO should try to stop this executable
+            return true;
+        }
+    }
+
+    @Override
+    public boolean hasStarted() {
+        return this.hasStarted;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
new file mode 100644
index 0000000..5ba8426
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.lock;
+
+import java.util.concurrent.ExecutorService;
+
+public interface DistributedJobLock extends JobLock {
+    boolean lockWithName(String cubeName, String serverName);
+
+    void unlockWithName(String name);
+
+    void watchLock(ExecutorService pool, DoWatchLock doWatch);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
new file mode 100644
index 0000000..08c13f9
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.lock;
+
+public interface DoWatchLock {
+    void doWatch(String path, String data);
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
new file mode 100644
index 0000000..c33f3da
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+public class BaseTestDistributedScheduler {
+    static ExecutableManager jobService;
+    static ZookeeperDistributedJobLock jobLock;
+    static DistributedScheduler scheduler1;
+    static DistributedScheduler scheduler2;
+    static KylinConfig kylinConfig1;
+    static KylinConfig kylinConfig2;
+    static CuratorFramework zkClient;
+
+    static final String SEGMENT_ID = "segmentId";
+    static final String segmentId1 = "segmentId1";
+    static final String segmentId2 = "segmentId2";
+    static final String serverName1 = "serverName1";
+    static final String serverName2 = "serverName2";
+    static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties";
+    static final String confDstPath = "../examples/kylin.properties";
+    static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+
+    private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
+
+    static {
+        try {
+            ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws Exception {
+        staticCreateTestMetadata(SANDBOX_TEST_DATA);
+        System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
+
+        initZk();
+
+        kylinConfig1 = KylinConfig.getInstanceFromEnv();
+        jobService = ExecutableManager.getInstance(kylinConfig1);
+        for (String jobId : jobService.getAllJobIds()) {
+            jobService.deleteJob(jobId);
+        }
+
+        jobLock = new ZookeeperDistributedJobLock();
+        scheduler1 = DistributedScheduler.getInstance(kylinConfig1);
+        scheduler1.setServerName(serverName1);
+        scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock);
+        if (!scheduler1.hasStarted()) {
+            throw new RuntimeException("scheduler1 not started");
+        }
+
+        String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath();
+        String absoluteConfDstPath = new File(confDstPath).getAbsolutePath();
+        copyFile(absoluteConfSrcPath, absoluteConfDstPath);
+        kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath);
+
+        scheduler2 = DistributedScheduler.getInstance(kylinConfig2);
+        scheduler2.setServerName(serverName2);
+        scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock);
+        if (!scheduler2.hasStarted()) {
+            throw new RuntimeException("scheduler2 not started");
+        }
+
+        Thread.sleep(10000);
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        System.clearProperty(KylinConfig.KYLIN_CONF);
+        System.clearProperty("kylin.job.controller.lock");
+
+        deleteFile(confDstPath);
+    }
+
+    private static void staticCreateTestMetadata(String kylinConfigFolder) {
+        KylinConfig.destroyInstance();
+
+        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
+            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+    }
+
+    void waitForJobFinish(String jobId) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            final ExecutableState status = job.getStatus();
+            if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    void waitForJobStatus(String jobId, ExecutableState state, long interval) {
+        while (true) {
+            AbstractExecutable job = jobService.getJob(jobId);
+            if (state == job.getStatus()) {
+                break;
+            } else {
+                try {
+                    Thread.sleep(interval);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+    boolean lock(ZookeeperDistributedJobLock jobLock, String cubeName, String serverName) {
+        return jobLock.lockWithName(cubeName, serverName);
+    }
+
+    private static void initZk() {
+        String zkConnectString = getZKConnectString();
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+    }
+
+    private static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    String getServerName(String cubeName) {
+        String lockPath = getLockPath(cubeName);
+        String serverName = null;
+        if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+            try {
+                if (zkClient.checkExists().forPath(lockPath) != null) {
+                    byte[] data = zkClient.getData().forPath(lockPath);
+                    serverName = new String(data, Charset.forName("UTF-8"));
+                }
+            } catch (Exception e) {
+                logger.error("get the serverName failed", e);
+            }
+        }
+        return serverName;
+    }
+
+    private String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private static void copyFile(String srcPath, String dstPath) {
+        try {
+            File srcFile = new File(srcPath);
+            File dstFile = new File(dstPath);
+            Files.copy(srcFile.toPath(), dstFile.toPath());
+        } catch (Exception e) {
+            logger.error("copy the file failed", e);
+        }
+    }
+
+    private static void deleteFile(String path) {
+        try {
+            Files.delete(new File(path).toPath());
+        } catch (Exception e) {
+            logger.error("delete the file failed", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
new file mode 100644
index 0000000..052baad
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.job.impl.threadpool.DefaultContext;
+
+public class ContextTestExecutable extends AbstractExecutable {
+    public ContextTestExecutable() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+
+        DefaultContext defaultContext = (DefaultContext) context;
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) {
+            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
+        } else {
+            return new ExecuteResult(ExecuteResult.State.ERROR, "error");
+        }
+    }
+
+    private int getHashCode(KylinConfig config) {
+        return System.identityHashCode(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
new file mode 100644
index 0000000..443e73b
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler {
+    @Test
+    public void testSchedulerLock() throws Exception {
+        if (!lock(jobLock, segmentId1, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task1 = new SucceedTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task2 = new SucceedTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId1);
+        AbstractExecutable task3 = new SucceedTestExecutable();
+        task3.setParam(SEGMENT_ID, segmentId1);
+        job.addTask(task1);
+        job.addTask(task2);
+        job.addTask(task3);
+        jobService.addJob(job);
+
+        Assert.assertEquals(serverName1, getServerName(segmentId1));
+
+        waitForJobFinish(job.getId());
+
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+
+        Assert.assertEquals(null, getServerName(segmentId1));
+    }
+
+    @Test
+    public void testSchedulerConsistent() throws Exception {
+        if (!lock(jobLock, segmentId2, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId2);
+        ContextTestExecutable task1 = new ContextTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId2);
+        job.addTask(task1);
+        jobService.addJob(job);
+
+        waitForJobFinish(job.getId());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+
+        if (!lock(jobLock, segmentId2, serverName2)) {
+            throw new JobException("fail to get the lock");
+        }
+
+        DefaultChainedExecutable job2 = new DefaultChainedExecutable();
+        job2.setParam(SEGMENT_ID, segmentId2);
+        ContextTestExecutable task2 = new ContextTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId2);
+        job2.addTask(task2);
+        jobService.addJob(job2);
+
+        waitForJobFinish(job2.getId());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
new file mode 100644
index 0000000..3137aef
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job;
+
+import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedScheduler {
+    @Test
+    public void testSchedulerTakeOver() throws Exception {
+        if (!lock(jobLock, segmentId2, serverName1)) {
+            throw new JobException("fail to get the lock");
+        }
+
+        DefaultChainedExecutable job = new DefaultChainedExecutable();
+        job.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task1 = new SucceedTestExecutable();
+        task1.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task2 = new SucceedTestExecutable();
+        task2.setParam(SEGMENT_ID, segmentId2);
+        AbstractExecutable task3 = new SucceedTestExecutable();
+        task3.setParam(SEGMENT_ID, segmentId2);
+        job.addTask(task1);
+        job.addTask(task2);
+        job.addTask(task3);
+        jobService.addJob(job);
+
+        waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
+
+        scheduler1.shutdown();
+        scheduler1 = null;
+
+        waitForJobFinish(job.getId());
+
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index 7022bfc..16b643c 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -24,25 +24,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.TimeZone;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.Scheduler;
-import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
 import org.apache.kylin.rest.service.JobService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -50,64 +40,14 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
 
-/**
- *
- */
 @Controller
 @RequestMapping(value = "jobs")
-public class JobController extends BasicController implements InitializingBean {
+public class JobController extends BasicController {
     private static final Logger logger = LoggerFactory.getLogger(JobController.class);
 
     @Autowired
     private JobService jobService;
 
-    private JobLock jobLock;
-
-    /*
-     * (non-Javadoc)
-     * 
-     * @see
-     * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void afterPropertiesSet() throws Exception {
-
-        String timeZone = jobService.getConfig().getTimeZone();
-        TimeZone tzone = TimeZone.getTimeZone(timeZone);
-        TimeZone.setDefault(tzone);
-
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType());
-
-        jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock());
-
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
-                    if (!scheduler.hasStarted()) {
-                        logger.info("Job engine doesn't start in this node.");
-                    }
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }).start();
-
-        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    scheduler.shutdown();
-                } catch (SchedulerException e) {
-                    logger.error("error occurred to shutdown scheduler", e);
-                }
-            }
-        }));
-    }
-
     /**
      * get all cube jobs
      * 

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 99e54b9..a6246f8 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -461,6 +461,8 @@ public class CubeService extends BasicService {
         }
 
         DefaultChainedExecutable job = new DefaultChainedExecutable();
+        //make sure the job could be scheduled when the DistributedScheduler is enable.
+        job.setParam("segmentId", tableName);
         job.setName("Hive Column Cardinality calculation for table '" + tableName + "'");
         job.setSubmitter(submitter);
 
@@ -471,6 +473,7 @@ public class CubeService extends BasicService {
 
         step1.setMapReduceJobClass(HiveColumnCardinalityJob.class);
         step1.setMapReduceParams(param);
+        step1.setParam("segmentId", tableName);
 
         job.addTask(step1);
 
@@ -478,6 +481,7 @@ public class CubeService extends BasicService {
 
         step2.setJobClass(HiveColumnCardinalityUpdateJob.class);
         step2.setJobParams(param);
+        step2.setParam("segmentId", tableName);
         job.addTask(step2);
 
         getExecutableManager().addJob(job);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 49b9b9f..a6a9842 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -19,6 +19,8 @@
 package org.apache.kylin.rest.service;
 
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Calendar;
 import java.util.Collections;
 import java.util.Date;
@@ -26,8 +28,11 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimeZone;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
@@ -38,15 +43,21 @@ import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.Scheduler;
+import org.apache.kylin.job.SchedulerFactory;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobException;
+import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.JobLock;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.constant.Constant;
@@ -56,7 +67,9 @@ import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.source.SourcePartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
@@ -69,15 +82,64 @@ import com.google.common.collect.Sets;
 /**
  * @author ysong1
  */
+
+@EnableAspectJAutoProxy(proxyTargetClass = true)
 @Component("jobService")
-public class JobService extends BasicService {
+public class JobService extends BasicService implements InitializingBean {
 
     @SuppressWarnings("unused")
     private static final Logger logger = LoggerFactory.getLogger(JobService.class);
 
+    private JobLock jobLock;
+
     @Autowired
     private AccessService accessService;
 
+    /*
+    * (non-Javadoc)
+    *
+    * @see
+    * org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
+    */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void afterPropertiesSet() throws Exception {
+
+        String timeZone = getConfig().getTimeZone();
+        TimeZone tzone = TimeZone.getTimeZone(timeZone);
+        TimeZone.setDefault(tzone);
+
+        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        final Scheduler<AbstractExecutable> scheduler = (Scheduler<AbstractExecutable>) SchedulerFactory.scheduler(kylinConfig.getSchedulerType());
+
+        jobLock = (JobLock) ClassUtil.newInstance(kylinConfig.getJobControllerLock());
+
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.init(new JobEngineConfig(kylinConfig), jobLock);
+                    if (!scheduler.hasStarted()) {
+                        logger.info("scheduler has not been started");
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    scheduler.shutdown();
+                } catch (SchedulerException e) {
+                    logger.error("error occurred to shutdown scheduler", e);
+                }
+            }
+        }));
+    }
+
     public List<JobInstance> listAllJobs(final String cubeName, final String projectName, final List<JobStatusEnum> statusList, final Integer limitValue, final Integer offsetValue, final JobTimeFilterEnum timeFilter) throws IOException, JobException {
         Integer limit = (null == limitValue) ? 30 : limitValue;
         Integer offset = (null == offsetValue) ? 0 : offsetValue;
@@ -215,12 +277,15 @@ public class JobService extends BasicService {
             SourcePartition sourcePartition = new SourcePartition(startDate, endDate, startOffset, endOffset, sourcePartitionOffsetStart, sourcePartitionOffsetEnd);
             sourcePartition = source.parsePartitionBeforeBuild(cube, sourcePartition);
             CubeSegment newSeg = getCubeManager().appendSegment(cube, sourcePartition);
+            lockSegment(newSeg.getUuid());
             job = EngineFactory.createBatchCubingJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, startOffset, endOffset, force);
+            lockSegment(newSeg.getUuid());
             job = EngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
             CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate, startOffset, endOffset);
+            lockSegment(refreshSeg.getUuid());
             job = EngineFactory.createBatchCubingJob(refreshSeg, submitter);
         } else {
             throw new JobException("invalid build type:" + buildType);
@@ -363,6 +428,8 @@ public class JobService extends BasicService {
 
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#job, 'ADMINISTRATION') or hasPermission(#job, 'OPERATION') or hasPermission(#job, 'MANAGEMENT')")
     public void resumeJob(JobInstance job) throws IOException, JobException {
+        lockSegment(job.getRelatedSegment());
+
         getExecutableManager().resumeJob(job.getId());
     }
 
@@ -380,7 +447,34 @@ public class JobService extends BasicService {
             }
         }
         getExecutableManager().discardJob(job.getId());
+
+        //release the segment lock when discarded the job but the job hasn't scheduled
+        releaseSegmentLock(job.getRelatedSegment());
+
         return job;
     }
 
+    private void lockSegment(String segmentId) throws JobException {
+        if (jobLock instanceof DistributedJobLock) {
+            if (!((DistributedJobLock) jobLock).lockWithName(segmentId, getServerName())) {
+                throw new JobException("Fail to get the segment lock, the segment may be building in another job server");
+            }
+        }
+    }
+
+    private void releaseSegmentLock(String segmentId) {
+        if (jobLock instanceof DistributedJobLock) {
+            ((DistributedJobLock) jobLock).unlockWithName(segmentId);
+        }
+    }
+
+    private String getServerName() {
+        String serverName = null;
+        try {
+            serverName = InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException e) {
+            logger.error("fail to get the hostname");
+        }
+        return serverName;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
new file mode 100644
index 0000000..eba7a20
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.util;
+
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.imps.CuratorFrameworkState;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.lock.DistributedJobLock;
+import org.apache.kylin.job.lock.DoWatchLock;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.zookeeper.CreateMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+
+/**
+ * the jobLock is specially used to support distributed scheduler.
+ */
+
+public class ZookeeperDistributedJobLock implements DistributedJobLock {
+    private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
+
+    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+    private static CuratorFramework zkClient;
+    private static PathChildrenCache childrenCache;
+
+    static {
+        String zkConnectString = getZKConnectString();
+        logger.info("zk connection string:" + zkConnectString);
+        if (StringUtils.isEmpty(zkConnectString)) {
+            throw new IllegalArgumentException("ZOOKEEPER_QUORUM is empty!");
+        }
+
+        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+        zkClient = CuratorFrameworkFactory.newClient(zkConnectString, retryPolicy);
+        zkClient.start();
+
+        childrenCache = new PathChildrenCache(zkClient, getWatchPath(), true);
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    childrenCache.close();
+                    zkClient.close();
+                } catch (Exception e) {
+                    logger.error("error occurred to close PathChildrenCache", e);
+                }
+            }
+        }));
+    }
+
+    /**
+     * Lock the segment with the segmentId and serverName.
+     *
+     * <p> if the segment related job want to be scheduled,
+     * it must acquire the segment lock. segmentId is used to get the lock path,
+     * serverName marked which job server keep the segment lock.
+     *
+     * @param segmentId the id of segment need to lock
+     *
+     * @param serverName the hostname of job server
+     *
+     * @return <tt>true</tt> if the segment locked successfully
+     */
+
+    @Override
+    public boolean lockWithName(String segmentId, String serverName) {
+        String lockPath = getLockPath(segmentId);
+        logger.info(serverName + " start lock the segment: " + segmentId);
+
+        boolean hasLock = false;
+        try {
+            if (!(zkClient.getState().equals(CuratorFrameworkState.STARTED))) {
+                logger.error("zookeeper have not start");
+                return false;
+            }
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                if (hasLock(serverName, lockPath)) {
+                    hasLock = true;
+                    logger.info(serverName + " has kept the lock for segment: " + segmentId);
+                }
+            } else {
+                zkClient.create().withMode(CreateMode.EPHEMERAL).forPath(lockPath, serverName.getBytes(Charset.forName("UTF-8")));
+                if (hasLock(serverName, lockPath)) {
+                    hasLock = true;
+                    logger.info(serverName + " lock the segment: " + segmentId + " successfully");
+                }
+            }
+        } catch (Exception e) {
+            logger.error(serverName + " error acquire lock for the segment: " + segmentId, e);
+        }
+        if (!hasLock) {
+            logger.info(serverName + " fail to acquire lock for the segment: " + segmentId);
+            return false;
+        }
+        return true;
+    }
+
+    private boolean hasLock(String serverName, String lockPath) {
+        String lockServerName = null;
+        try {
+            if (zkClient.checkExists().forPath(lockPath) != null) {
+                byte[] data = zkClient.getData().forPath(lockPath);
+                lockServerName = new String(data, Charset.forName("UTF-8"));
+            }
+        } catch (Exception e) {
+            logger.error("fail to get the serverName for the path: " + lockPath, e);
+        }
+        return lockServerName.equalsIgnoreCase(serverName);
+    }
+
+    /**
+     * release the segment lock with the segmentId.
+     *
+     * <p> the segment related zookeeper node will be deleted.
+     *
+     * @param segmentId the name of segment need to release the lock
+     */
+
+    @Override
+    public void unlockWithName(String segmentId) {
+        String lockPath = getLockPath(segmentId);
+        try {
+            if (zkClient.getState().equals(CuratorFrameworkState.STARTED)) {
+                if (zkClient.checkExists().forPath(lockPath) != null) {
+                    zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(lockPath);
+                    logger.info("the lock for " + segmentId + " release successfully");
+                } else {
+                    logger.info("the lock for " + segmentId + " has released");
+                }
+            }
+        } catch (Exception e) {
+            logger.error("error release lock :" + segmentId);
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * watching all the locked segments related zookeeper nodes change,
+     * in order to when one job server is down, other job server can take over the running jobs.
+     *
+     * @param pool the threadPool watching the zookeeper node change
+     * @param doWatch do the concrete action with the zookeeper node path and zookeeper node data
+     */
+
+    @Override
+    public void watchLock(ExecutorService pool, final DoWatchLock doWatch) {
+        try {
+            childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
+            childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
+                @Override
+                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+                    switch (event.getType()) {
+                    case CHILD_REMOVED:
+                        doWatch.doWatch(event.getData().getPath(), new String(event.getData().getData(), Charset.forName("UTF-8")));
+                        break;
+                    default:
+                        break;
+                    }
+                }
+            }, pool);
+        } catch (Exception e) {
+            logger.warn("watch the zookeeper node fail: " + e);
+        }
+    }
+
+    private static String getZKConnectString() {
+        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
+        final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
+        final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+            @Nullable
+            @Override
+            public String apply(String input) {
+                return input + ":" + port;
+            }
+        }), ",");
+    }
+
+    private String getLockPath(String pathName) {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+    }
+
+    private static String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    }
+
+    @Override
+    public boolean lock() {
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+
+    }
+}


[6/6] kylin git commit: KYLIN-2169 fix test case

Posted by li...@apache.org.
KYLIN-2169 fix test case


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c8efa548
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c8efa548
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c8efa548

Branch: refs/heads/master
Commit: c8efa548307e6acec92740d2049b885b8b98190f
Parents: b1e81d4
Author: Yang Li <li...@apache.org>
Authored: Wed Nov 9 00:40:47 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:34:04 2016 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/job/execution/AbstractExecutable.java   | 5 +++--
 .../apache/kylin/job/execution/DefaultChainedExecutable.java | 8 ++++++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/c8efa548/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 9292418..80a92de 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -61,7 +61,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         setId(UUID.randomUUID().toString());
     }
     
-    void initConfig(KylinConfig config) {
+    protected void initConfig(KylinConfig config) {
         Preconditions.checkState(this.config == null || this.config == config);
         this.config = config;
     }
@@ -201,7 +201,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
 
     @Override
     public final ExecutableState getStatus() {
-        return getManager().getOutput(this.getId()).getState();
+        ExecutableManager manager = getManager();
+        return manager.getOutput(this.getId()).getState();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/c8efa548/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
index 7b92608..fccab30 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java
@@ -21,6 +21,7 @@ package org.apache.kylin.job.execution;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.exception.ExecuteException;
 
 import com.google.common.collect.Lists;
@@ -36,6 +37,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai
         super();
     }
 
+    protected void initConfig(KylinConfig config) {
+        super.initConfig(config);
+        for (AbstractExecutable sub : subTasks) {
+            sub.initConfig(config);
+        }
+    }
+    
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
         List<? extends Executable> executables = getTasks();


[5/6] kylin git commit: KYLIN-2006 refactor test case, avoid conflict with default CI metadata

Posted by li...@apache.org.
KYLIN-2006 refactor test case, avoid conflict with default CI metadata


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1e81d4e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1e81d4e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1e81d4e

Branch: refs/heads/master
Commit: b1e81d4e261a5e9367390e63f8148468fac9e001
Parents: 85c4ded
Author: Yang Li <li...@apache.org>
Authored: Tue Nov 8 22:30:23 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Nov 9 06:32:02 2016 +0800

----------------------------------------------------------------------
 .../common/util/AbstractKylinTestCase.java      |   4 +-
 .../kylin/job/execution/AbstractExecutable.java |   4 +
 .../kylin/job/execution/ExecutableManager.java  |   4 +
 .../impl/threadpool/DistributedScheduler.java   |   6 +-
 .../kylin/job/BaseTestDistributedScheduler.java | 121 +++++++++----------
 .../apache/kylin/job/ContextTestExecutable.java |   9 +-
 .../job/ITDistributedSchedulerBaseTest.java     |  22 ++--
 .../job/ITDistributedSchedulerTakeOverTest.java |  10 +-
 .../hbase/util/ZookeeperDistributedJobLock.java |  42 ++++---
 9 files changed, 115 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
index 14bf90b..2154c32 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java
@@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase {
             "org.apache.kylin.storage.hybrid.HybridManager", //
             "org.apache.kylin.metadata.realization.RealizationRegistry", //
             "org.apache.kylin.metadata.project.ProjectManager", //
-            "org.apache.kylin.metadata.MetadataManager" //
+            "org.apache.kylin.metadata.MetadataManager", //
+            "org.apache.kylin.job.impl.threadpool.DistributedScheduler", //
+            "org.apache.kylin.job.manager.ExecutableManager", //
     };
 
     public abstract void createTestMetadata() throws Exception;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 2a4b2df..9292418 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         this.config = config;
     }
     
+    protected KylinConfig getConfig() {
+        return config;
+    }
+    
     protected ExecutableManager getManager() {
         return ExecutableManager.getInstance(config);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 92fc8c9..1db612f 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -65,6 +65,10 @@ public class ExecutableManager {
         return r;
     }
 
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
     private ExecutableManager(KylinConfig config) {
         logger.info("Using metadata url: " + config);
         this.config = config;

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
index 17df119..3937a24 100644
--- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
+++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
@@ -43,11 +43,11 @@ import org.apache.kylin.job.exception.SchedulerException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.job.lock.DistributedJobLock;
 import org.apache.kylin.job.lock.JobLock;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,6 +98,10 @@ public class DistributedScheduler implements Scheduler<AbstractExecutable>, Conn
         return r;
     }
 
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
     private class FetcherRunner implements Runnable {
         @Override
         synchronized public void run() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
index c33f3da..910db49 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/BaseTestDistributedScheduler.java
@@ -18,9 +18,13 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
+import java.io.File;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -29,12 +33,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DistributedScheduler;
-import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock;
 import org.junit.AfterClass;
@@ -42,14 +46,11 @@ import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.File;
-import java.nio.charset.Charset;
-import java.nio.file.Files;
-import java.util.Arrays;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
 
-public class BaseTestDistributedScheduler {
-    static ExecutableManager jobService;
+public class BaseTestDistributedScheduler extends HBaseMetadataTestCase {
+    static ExecutableManager execMgr;
     static ZookeeperDistributedJobLock jobLock;
     static DistributedScheduler scheduler1;
     static DistributedScheduler scheduler2;
@@ -62,35 +63,38 @@ public class BaseTestDistributedScheduler {
     static final String segmentId2 = "segmentId2";
     static final String serverName1 = "serverName1";
     static final String serverName2 = "serverName2";
-    static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
     static final String confSrcPath = "../examples/test_case_data/sandbox/kylin.properties";
-    static final String confDstPath = "../examples/kylin.properties";
-    static final String SANDBOX_TEST_DATA = "../examples/test_case_data/sandbox";
+    static final String confDstPath1 = "target/kylin_metadata_dist_lock_test1/kylin.properties";
+    static final String confDstPath2 = "target/kylin_metadata_dist_lock_test2/kylin.properties";
 
     private static final Logger logger = LoggerFactory.getLogger(BaseTestDistributedScheduler.class);
 
-    static {
-        try {
-            ClassUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
-        } catch (Exception e) {
-            e.printStackTrace();
-        }
-    }
-
     @BeforeClass
     public static void setup() throws Exception {
-        staticCreateTestMetadata(SANDBOX_TEST_DATA);
+        staticCreateTestMetadata();
         System.setProperty("kylin.job.controller.lock", "org.apache.kylin.storage.hbase.util.ZookeeperDistributedJobLock");
 
+        new File(confDstPath1).getParentFile().mkdirs();
+        new File(confDstPath2).getParentFile().mkdirs();
+        KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
+        String backup = srcConfig.getMetadataUrl();
+        srcConfig.setProperty("kylin.metadata.url", "kylin_metadata_dist_lock_test@hbase");
+        srcConfig.writeProperties(new File(confDstPath1));
+        srcConfig.writeProperties(new File(confDstPath2));
+        srcConfig.setProperty("kylin.metadata.url", backup);
+        kylinConfig1 = KylinConfig.createInstanceFromUri(new File(confDstPath1).getAbsolutePath());
+        kylinConfig2 = KylinConfig.createInstanceFromUri(new File(confDstPath2).getAbsolutePath());
+        
         initZk();
 
-        kylinConfig1 = KylinConfig.getInstanceFromEnv();
-        jobService = ExecutableManager.getInstance(kylinConfig1);
-        for (String jobId : jobService.getAllJobIds()) {
-            jobService.deleteJob(jobId);
+        if (jobLock == null)
+            jobLock = new ZookeeperDistributedJobLock(kylinConfig1);
+
+        execMgr = ExecutableManager.getInstance(kylinConfig1);
+        for (String jobId : execMgr.getAllJobIds()) {
+            execMgr.deleteJob(jobId);
         }
 
-        jobLock = new ZookeeperDistributedJobLock();
         scheduler1 = DistributedScheduler.getInstance(kylinConfig1);
         scheduler1.setServerName(serverName1);
         scheduler1.init(new JobEngineConfig(kylinConfig1), jobLock);
@@ -98,11 +102,6 @@ public class BaseTestDistributedScheduler {
             throw new RuntimeException("scheduler1 not started");
         }
 
-        String absoluteConfSrcPath = new File(confSrcPath).getAbsolutePath();
-        String absoluteConfDstPath = new File(confDstPath).getAbsolutePath();
-        copyFile(absoluteConfSrcPath, absoluteConfDstPath);
-        kylinConfig2 = KylinConfig.createInstanceFromUri(absoluteConfDstPath);
-
         scheduler2 = DistributedScheduler.getInstance(kylinConfig2);
         scheduler2.setServerName(serverName2);
         scheduler2.init(new JobEngineConfig(kylinConfig2), jobLock);
@@ -115,22 +114,30 @@ public class BaseTestDistributedScheduler {
 
     @AfterClass
     public static void after() throws Exception {
-        System.clearProperty(KylinConfig.KYLIN_CONF);
+        if (scheduler1 != null) {
+            scheduler1.shutdown();
+            scheduler1 = null;
+        }
+        if (scheduler2 != null) {
+            scheduler2.shutdown();
+            scheduler2 = null;
+        }
+        if (jobLock != null) {
+            jobLock.close();
+            jobLock = null;
+        }
+        if (zkClient != null) {
+            zkClient.close();
+            zkClient = null;
+        }
+        
         System.clearProperty("kylin.job.controller.lock");
-
-        deleteFile(confDstPath);
-    }
-
-    private static void staticCreateTestMetadata(String kylinConfigFolder) {
-        KylinConfig.destroyInstance();
-
-        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
-            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+        staticCleanupTestMetadata();
     }
 
     void waitForJobFinish(String jobId) {
         while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
+            AbstractExecutable job = execMgr.getJob(jobId);
             final ExecutableState status = job.getStatus();
             if (status == ExecutableState.SUCCEED || status == ExecutableState.ERROR || status == ExecutableState.STOPPED || status == ExecutableState.DISCARDED) {
                 break;
@@ -146,7 +153,7 @@ public class BaseTestDistributedScheduler {
 
     void waitForJobStatus(String jobId, ExecutableState state, long interval) {
         while (true) {
-            AbstractExecutable job = jobService.getJob(jobId);
+            AbstractExecutable job = execMgr.getJob(jobId);
             if (state == job.getStatus()) {
                 break;
             } else {
@@ -177,7 +184,7 @@ public class BaseTestDistributedScheduler {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
         final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
             @Nullable
             @Override
             public String apply(String input) {
@@ -203,24 +210,6 @@ public class BaseTestDistributedScheduler {
     }
 
     private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
-    }
-
-    private static void copyFile(String srcPath, String dstPath) {
-        try {
-            File srcFile = new File(srcPath);
-            File dstFile = new File(dstPath);
-            Files.copy(srcFile.toPath(), dstFile.toPath());
-        } catch (Exception e) {
-            logger.error("copy the file failed", e);
-        }
-    }
-
-    private static void deleteFile(String path) {
-        try {
-            Files.delete(new File(path).toPath());
-        } catch (Exception e) {
-            logger.error("delete the file failed", e);
-        }
+        return ZookeeperDistributedJobLock.ZOOKEEPER_LOCK_PATH + "/" + kylinConfig1.getMetadataUrlPrefix() + "/" + pathName;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
index 052baad..4696e67 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ContextTestExecutable.java
@@ -18,12 +18,10 @@
 
 package org.apache.kylin.job;
 
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.impl.threadpool.DefaultContext;
 
 public class ContextTestExecutable extends AbstractExecutable {
     public ContextTestExecutable() {
@@ -33,19 +31,14 @@ public class ContextTestExecutable extends AbstractExecutable {
     @Override
     protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
 
-        DefaultContext defaultContext = (DefaultContext) context;
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
         }
-        if (getHashCode(defaultContext.getConfig()) == getHashCode(KylinConfig.getInstanceFromEnv())) {
+        if (context.getConfig() == BaseTestDistributedScheduler.kylinConfig1) {
             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
         } else {
             return new ExecuteResult(ExecuteResult.State.ERROR, "error");
         }
     }
-
-    private int getHashCode(KylinConfig config) {
-        return System.identityHashCode(config);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
index 443e73b..0d5e011 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerBaseTest.java
@@ -42,16 +42,16 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         job.addTask(task1);
         job.addTask(task2);
         job.addTask(task3);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         Assert.assertEquals(serverName1, getServerName(segmentId1));
 
         waitForJobFinish(job.getId());
 
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
 
         Assert.assertEquals(null, getServerName(segmentId1));
     }
@@ -66,11 +66,11 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         ContextTestExecutable task1 = new ContextTestExecutable();
         task1.setParam(SEGMENT_ID, segmentId2);
         job.addTask(task1);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         waitForJobFinish(job.getId());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
 
         if (!lock(jobLock, segmentId2, serverName2)) {
             throw new JobException("fail to get the lock");
@@ -81,10 +81,10 @@ public class ITDistributedSchedulerBaseTest extends BaseTestDistributedScheduler
         ContextTestExecutable task2 = new ContextTestExecutable();
         task2.setParam(SEGMENT_ID, segmentId2);
         job2.addTask(task2);
-        jobService.addJob(job2);
+        execMgr.addJob(job2);
 
         waitForJobFinish(job2.getId());
-        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job2.getId()).getState());
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
index 3137aef..2b15ddd 100644
--- a/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/job/ITDistributedSchedulerTakeOverTest.java
@@ -43,7 +43,7 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
         job.addTask(task1);
         job.addTask(task2);
         job.addTask(task3);
-        jobService.addJob(job);
+        execMgr.addJob(job);
 
         waitForJobStatus(job.getId(), ExecutableState.RUNNING, 500);
 
@@ -52,9 +52,9 @@ public class ITDistributedSchedulerTakeOverTest extends BaseTestDistributedSched
 
         waitForJobFinish(job.getId());
 
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task2.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task3.getId()).getState());
-        Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(job.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task2.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task3.getId()).getState());
+        Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState());
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
index d8d27c5..613d783 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/ZookeeperDistributedJobLock.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ExecutorService;
 
 import javax.annotation.Nullable;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -52,11 +52,19 @@ import com.google.common.collect.Iterables;
 public class ZookeeperDistributedJobLock implements DistributedJobLock {
     private static Logger logger = LoggerFactory.getLogger(ZookeeperDistributedJobLock.class);
 
-    private static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
-    private static CuratorFramework zkClient;
-    private static PathChildrenCache childrenCache;
+    public static final String ZOOKEEPER_LOCK_PATH = "/kylin/job_engine/lock";
+
+    final private KylinConfig config;
+    final CuratorFramework zkClient;
+    final PathChildrenCache childrenCache;
+
+    public ZookeeperDistributedJobLock() {
+        this(KylinConfig.getInstanceFromEnv());
+    }
+
+    public ZookeeperDistributedJobLock(KylinConfig config) {
+        this.config = config;
 
-    static {
         String zkConnectString = getZKConnectString();
         logger.info("zk connection string:" + zkConnectString);
         if (StringUtils.isEmpty(zkConnectString)) {
@@ -71,12 +79,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
             @Override
             public void run() {
-                try {
-                    childrenCache.close();
-                    zkClient.close();
-                } catch (Exception e) {
-                    logger.error("error occurred to close PathChildrenCache", e);
-                }
+                close();
             }
         }));
     }
@@ -200,7 +203,7 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
         final String serverList = conf.get(HConstants.ZOOKEEPER_QUORUM);
         final String port = conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-        return org.apache.commons.lang3.StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
+        return StringUtils.join(Iterables.transform(Arrays.asList(serverList.split(",")), new Function<String, String>() {
             @Nullable
             @Override
             public String apply(String input) {
@@ -210,11 +213,11 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     }
 
     private String getLockPath(String pathName) {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix() + "/" + pathName;
+        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix() + "/" + pathName;
     }
 
-    private static String getWatchPath() {
-        return ZOOKEEPER_LOCK_PATH + "/" + KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
+    private String getWatchPath() {
+        return ZOOKEEPER_LOCK_PATH + "/" + config.getMetadataUrlPrefix();
     }
 
     @Override
@@ -226,4 +229,13 @@ public class ZookeeperDistributedJobLock implements DistributedJobLock {
     public void unlock() {
 
     }
+    
+    public void close() {
+        try {
+            childrenCache.close();
+            zkClient.close();
+        } catch (Exception e) {
+            logger.error("error occurred to close PathChildrenCache", e);
+        }
+    }
 }