You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/06 13:05:44 UTC

[2/3] incubator-kylin git commit: KYLIN-1116 Use local dictionary for InvertedIndex batch building

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
index 237bdd7..4c0c3ef 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_inner_join.json
@@ -4,49 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20140901000000",
-      "status": "READY",
-      "dictionaries": {
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
-        "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
-        "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
-        "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
-      },
-      "storage_location_identifier": "test_III",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 1420016227424,
   "descriptor": "test_kylin_ii_inner_join_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
index 07c1970..90c21bb 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_kylin_ii_left_join.json
@@ -4,49 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20140901000000",
-      "status": "READY",
-      "dictionaries": {
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/SITE_ID/30c9d5f0-abe4-4d1c-a147-610234d90ff1.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL2_NAME/9355165b-06ad-4c04-977c-a251e66e7e98.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME": "/dict/DEFAULT.TEST_KYLIN_FACT/LSTG_FORMAT_NAME/d5c40465-75e1-40bc-a960-06308f0134a6.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SELLER_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/SELLER_ID/fea3a18d-3f20-4b8b-a880-7af93e69241b.dict",
-        "EDW.TEST_SITES/CRE_USER": "/dict/EDW.TEST_SITES/CRE_USER/244af7a2-7352-4b30-811f-46e637d7a133.dict",
-        "DEFAULT.TEST_KYLIN_FACT/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_CAL_DT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_KYLIN_FACT/TRANS_ID": "/dict/DEFAULT.TEST_KYLIN_FACT/TRANS_ID/7fb8fed8-7f3b-4089-a85d-3ac07f575c82.dict",
-        "EDW.TEST_CAL_DT/WEEK_BEG_DT": "/dict/EDW.TEST_CAL_DT/WEEK_BEG_DT/962b5f64-bee1-49ee-a072-af882193b719.dict",
-        "DEFAULT.TEST_KYLIN_FACT/SLR_SEGMENT_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SITES/SITE_NAME": "/dict/EDW.TEST_SITES/SITE_NAME/f363531d-e969-4264-bffd-ac18f8f47220.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_DATE/1fc93a94-1feb-4af4-8078-81a6f1b65e2b.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/META_CATEG_NAME/895739d6-27e1-4ecc-b798-5851c319ea40.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/LEAF_CATEG_ID/38361fbc-b875-4273-b8b4-1b8a26ef8570.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_CD/bec11fda-9ae0-4668-98ea-f0f4e9dd6993.dict",
-        "EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC": "/dict/EDW.TEST_SELLER_TYPE_DIM/SELLER_TYPE_DESC/bfb86010-bf4c-4534-a2b0-59d020aed197.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD1/8ae44fb8-b01a-4db1-a901-dc5f463038cb.dict",
-        "EDW.TEST_SITES/SITE_ID": "/dict/EDW.TEST_SITES/SITE_ID/4ef43390-b07e-4d4c-872a-77c0bd783acb.dict",
-        "DEFAULT.TEST_KYLIN_FACT/CAL_DT": "/dict/EDW.TEST_CAL_DT/CAL_DT/5e4b4f35-0fc8-4940-b123-b18c9f77da19.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/USER_DEFINED_FIELD3/278d7283-518a-4cd0-b6e1-2573b523bf1f.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/UPD_USER/bb1bb7a5-b02d-45eb-b3c8-f1a4515264ca.dict",
-        "DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME": "/dict/DEFAULT.TEST_CATEGORY_GROUPINGS/CATEG_LVL3_NAME/b2d6fae1-eaac-4ac2-8a01-42e5c8b5c198.dict"
-      },
-      "storage_location_identifier": "test_III",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 1420016227424,
   "descriptor": "test_kylin_ii_left_join_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
index a703ae4..9abe3ed 100644
--- a/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/invertedindex/test_streaming_table_ii.json
@@ -4,26 +4,8 @@
   "owner": null,
   "version": null,
   "cost": 10,
-  "status": "READY",
-  "segments": [
-    {
-      "uuid": null,
-      "name": "19700101000000_20190901000000",
-      "status": "READY",
-      "dictionaries": {
-      },
-      "storage_location_identifier": "KYLIN_2STEAMTEST",
-      "date_range_start": 0,
-      "date_range_end": 0,
-      "size_kb": 0,
-      "input_records": 0,
-      "input_records_size": 0,
-      "last_build_time": 0,
-      "last_build_job_id": null,
-      "create_time": null,
-      "binary_signature": null
-    }
-  ],
+  "status": "DISABLED",
+  "segments": [],
   "last_modified": 0,
   "descriptor": "test_streaming_table_ii_desc",
   "create_time": null,

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
index 7aff714..5633004 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIManager.java
@@ -32,13 +32,7 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.dict.DistinctColumnValuesProvider;
-import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.realization.IRealization;
@@ -131,45 +125,7 @@ public class IIManager implements IRealizationProvider {
         }
         return result;
     }
-
-    public void buildInvertedIndexDictionary(IISegment iiSeg, DistinctColumnValuesProvider factTableValueProvider) throws IOException {
-        logger.info("Start building ii dictionary");
-        DictionaryManager dictMgr = getDictionaryManager();
-        IIDesc iiDesc = iiSeg.getIIInstance().getDescriptor();
-        for (TblColRef column : iiDesc.listAllColumns()) {
-            logger.info("Dealing with column {}", column);
-            if (iiDesc.isMetricsCol(column)) {
-                continue;
-            }
-
-            DictionaryInfo dict = dictMgr.buildDictionary(iiDesc.getModel(), "true", column, factTableValueProvider);
-            iiSeg.putDictResPath(column, dict.getResourcePath());
-        }
-        updateII(iiSeg.getIIInstance());
-    }
-
-    /**
-     * return null if no dictionary for given column
-     */
-    public Dictionary<?> getDictionary(IISegment iiSeg, TblColRef col) {
-        DictionaryInfo info = null;
-        try {
-            DictionaryManager dictMgr = getDictionaryManager();
-            // logger.info("Using metadata url " + metadataUrl +
-            // " for DictionaryManager");
-            String dictResPath = iiSeg.getDictResPath(col);
-            if (dictResPath == null)
-                return null;
-
-            info = dictMgr.getDictionaryInfo(dictResPath);
-            if (info == null)
-                throw new IllegalStateException("No dictionary found by " + dictResPath + ", invalid II state; II segment" + iiSeg + ", col " + col);
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to get dictionary for II segment" + iiSeg + ", col" + col, e);
-        }
-
-        return info.getDictionaryObject();
-    }
+    
 
     public IIInstance createII(IIInstance ii) throws IOException {
 
@@ -300,10 +256,6 @@ public class IIManager implements IRealizationProvider {
         }
     }
 
-    private DictionaryManager getDictionaryManager() {
-        return DictionaryManager.getInstance(config);
-    }
-
     private ResourceStore getStore() {
         return ResourceStore.getStore(this.config);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
index adcca8b..c3ca464 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IISegment.java
@@ -19,18 +19,14 @@
 package org.apache.kylin.invertedindex;
 
 import java.text.SimpleDateFormat;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.TimeZone;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -49,7 +45,7 @@ import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 // TODO: remove segment concept for II, append old hbase table
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class IISegment implements Comparable<IISegment>, IDictionaryAware, IRealizationSegment {
+public class IISegment implements Comparable<IISegment>, IRealizationSegment {
 
     @JsonBackReference
     private IIInstance iiInstance;
@@ -83,11 +79,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
     private String binarySignature; // a hash of schema and dictionary ID,
     // used for sanity check
 
-    @JsonProperty("dictionaries")
-    private ConcurrentHashMap<String, String> dictionaries; // table/column ==>
-    // dictionary
-    // resource path
-
     private transient TableRecordInfo tableRecordInfo;
 
     /**
@@ -216,28 +207,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return storageLocationIdentifier;
     }
 
-    public Map<String, String> getDictionaries() {
-        if (dictionaries == null)
-            dictionaries = new ConcurrentHashMap<String, String>();
-        return dictionaries;
-    }
-
-    public Collection<String> getDictionaryPaths() {
-        return getDictionaries().values();
-    }
-
-    public String getDictResPath(TblColRef col) {
-        return getDictionaries().get(dictKey(col));
-    }
-
-    public void putDictResPath(TblColRef col, String dictResPath) {
-        getDictionaries().put(dictKey(col), dictResPath);
-    }
-
-    private String dictKey(TblColRef col) {
-        return col.getTable() + "/" + col.getName();
-    }
-
     /**
      * @param storageLocationIdentifier the storageLocationIdentifier to set
      */
@@ -262,10 +231,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return tableRecordInfo;
     }
 
-    //    public void updateDictionary(List<Dictionary<?>> dicts) {
-    //        getTableRecordInfo().updateDictionary( dicts);
-    //    }
-
     public List<TblColRef> getColumns() {
         return this.getTableRecordInfo().getColumns();
     }
@@ -275,20 +240,6 @@ public class IISegment implements Comparable<IISegment>, IDictionaryAware, IReal
         return Objects.toStringHelper(this).add("uuid", uuid).add("create_time_utc:", createTimeUTC).add("name", name).add("last_build_job_id", lastBuildJobID).add("status", status).toString();
     }
 
-    @Override
-    public int getColumnLength(TblColRef col) {
-
-        int index = getTableRecordInfo().findColumn(col);
-        return getTableRecordInfo().getDigest().length(index);
-    }
-
-    @Override
-    public Dictionary<?> getDictionary(TblColRef col) {
-
-        int index = getTableRecordInfo().findColumn(col);
-        return getTableRecordInfo().dict(index);
-    }
-
     public long getCreateTimeUTC() {
         return createTimeUTC;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
new file mode 100644
index 0000000..1c293d7
--- /dev/null
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/SliceBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.invertedindex.index;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.util.IIDictionaryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+/**
+ */
+public final class SliceBuilder {
+
+    private static Logger logger = LoggerFactory.getLogger(SliceBuilder.class);
+
+    private final BatchSliceMaker sliceMaker;
+    private final IIDesc iiDesc;
+    private final boolean useLocalDict;
+
+    public SliceBuilder(IIDesc desc, short shard, boolean useLocalDict) {
+        this.iiDesc = desc;
+        this.sliceMaker = new BatchSliceMaker(desc, shard);
+        this.useLocalDict = useLocalDict;
+    }
+
+    public Slice buildSlice(StreamingBatch microStreamBatch) {
+        final List<List<String>> messages = Lists.transform(microStreamBatch.getMessages(), new Function<StreamingMessage, List<String>>() {
+            @Nullable
+            @Override
+            public List<String> apply(@Nullable StreamingMessage input) {
+                return input.getData();
+            }
+        });
+        final Dictionary<?>[] dictionaries = useLocalDict ? IIDictionaryBuilder.buildDictionary(messages, iiDesc) : new Dictionary[iiDesc.listAllColumns().size()];
+        TableRecordInfo tableRecordInfo = new TableRecordInfo(iiDesc, dictionaries);
+        return build(messages, tableRecordInfo, dictionaries);
+    }
+
+    private Slice build(List<List<String>> table, final TableRecordInfo tableRecordInfo, Dictionary<?>[] localDictionary) {
+        final Slice slice = sliceMaker.makeSlice(tableRecordInfo.getDigest(), Lists.transform(table, new Function<List<String>, TableRecord>() {
+            @Nullable
+            @Override
+            public TableRecord apply(@Nullable List<String> input) {
+                TableRecord result = tableRecordInfo.createTableRecord();
+                for (int i = 0; i < input.size(); i++) {
+                    result.setValueString(i, input.get(i));
+                }
+                return result;
+            }
+        }));
+        slice.setLocalDictionaries(localDictionary);
+        return slice;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
index baafacd..8a0c2ba 100644
--- a/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
+++ b/invertedindex/src/test/java/org/apache/kylin/invertedindex/invertedindex/IIInstanceTest.java
@@ -53,11 +53,6 @@ public class IIInstanceTest extends LocalFileMetadataTestCase {
 
         Assert.assertTrue(iiInstances.size() > 0);
 
-        IIInstance instance = iiInstances.get(0);
-
-        Dictionary dict = mgr.getDictionary(instance.getFirstSegment(), instance.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID"));
-
-        Assert.assertNotNull(dict);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 1eb2683..b8d1333 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
@@ -78,6 +79,8 @@ public class HiveMRInput implements IMRInput {
             try {
                 HCatInputFormat.setInput(job, dbName, tableName);
                 job.setInputFormatClass(HCatInputFormat.class);
+
+                job.setMapOutputValueClass(org.apache.hive.hcatalog.data.DefaultHCatRecord.class);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index a1ab712..ee5a555 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -48,8 +48,8 @@ import kafka.message.MessageAndOffset;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.engine.streaming.IStreamingInput;
-import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingBatch;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaRequester;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index aace8bc..3455f1d 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -47,7 +47,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.streaming.StreamingManager;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaConfig;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index c0e506f..9691ea7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -40,7 +40,7 @@ import java.util.List;
 
 import kafka.message.MessageAndOffset;
 
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 4fae228..00f93a5 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -43,7 +43,7 @@ import kafka.message.MessageAndOffset;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.TimeUtil;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index de5e58e..0e29a0c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -40,7 +40,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.OptionsHelper;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.KafkaConfigManager;
 import org.apache.kylin.source.kafka.StreamingParser;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 2833ea4..96b7fa7 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -11,7 +11,7 @@ import kafka.javaapi.PartitionMetadata;
 import kafka.message.MessageAndOffset;
 
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.source.kafka.StreamingParser;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
index e4b688f..0e0a8ce 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHFileMapper.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase.ii;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
@@ -43,13 +44,29 @@ public class IICreateHFileMapper extends KylinMapper<ImmutableBytesWritable, Imm
     @Override
     protected void map(ImmutableBytesWritable key, ImmutableBytesWritable value, Context context) throws IOException, InterruptedException {
 
+        ByteBuffer buffer = ByteBuffer.wrap(value.get(), value.getOffset(), value.getLength());
+        int totalLength = value.getLength();
+        int valueLength = buffer.getInt();
+        int dictionaryLength = totalLength - valueLength - 4;
         KeyValue kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
                 IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
                 IIDesc.HBASE_QUALIFIER_BYTES, 0, IIDesc.HBASE_QUALIFIER_BYTES.length, //
                 timestamp, Type.Put, //
-                value.get(), value.getOffset(), value.getLength());
+                buffer.array(), buffer.position(), valueLength);
 
+        // write value
         context.write(key, kv);
+
+        kv = new KeyValue(key.get(), key.getOffset(), key.getLength(), //
+                IIDesc.HBASE_FAMILY_BYTES, 0, IIDesc.HBASE_FAMILY_BYTES.length, //
+                IIDesc.HBASE_DICTIONARY_BYTES, 0, IIDesc.HBASE_DICTIONARY_BYTES.length, //
+                timestamp, Type.Put, //
+                buffer.array(), buffer.position() + valueLength, dictionaryLength);
+
+
+        // write dictionary
+        context.write(key, kv);
+        
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
index 0a72a91..bcfe346 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/IICreateHTableJob.java
@@ -47,7 +47,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
-
+        HBaseAdmin admin = null;
         try {
             options.addOption(OPTION_II_NAME);
             options.addOption(OPTION_HTABLE_NAME);
@@ -61,6 +61,22 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             IIInstance ii = iiManager.getII(iiName);
             int sharding = ii.getDescriptor().getSharding();
 
+
+            Configuration conf = HBaseConfiguration.create(getConf());
+            // check if the table already exists
+            admin = new HBaseAdmin(conf);
+            if (admin.tableExists(tableName)) {
+                if (admin.isTableEnabled(tableName)) {
+                    logger.info("Table " + tableName + " already exists and is enabled, no need to create.");
+                    return 0;
+                } else {
+                    logger.error("Table " + tableName + " is disabled, couldn't append data");
+                    return 1;
+                }
+            }
+        
+            // table doesn't exist, need to create
+
             HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
             HColumnDescriptor cf = new HColumnDescriptor(IIDesc.HBASE_FAMILY);
             cf.setMaxVersions(1);
@@ -100,7 +116,6 @@ public class IICreateHTableJob extends AbstractHadoopJob {
             tableDesc.setValue(IRealizationConstants.HTableCreationTime, String.valueOf(System.currentTimeMillis()));
             tableDesc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());
 
-            Configuration conf = HBaseConfiguration.create(getConf());
             if (User.isHBaseSecurityEnabled(conf)) {
                 // add coprocessor for bulk load
                 tableDesc.addCoprocessor("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
@@ -108,13 +123,7 @@ public class IICreateHTableJob extends AbstractHadoopJob {
 
             IIDeployCoprocessorCLI.deployCoprocessor(tableDesc);
 
-            // drop the table first
-            HBaseAdmin admin = new HBaseAdmin(conf);
-            if (admin.tableExists(tableName)) {
-                admin.disableTable(tableName);
-                admin.deleteTable(tableName);
-            }
-
+          
             // create table
             byte[][] splitKeys = getSplits(sharding);
             if (splitKeys.length == 0)
@@ -126,12 +135,14 @@ public class IICreateHTableJob extends AbstractHadoopJob {
                 }
             }
             System.out.println("create hbase table " + tableName + " done.");
-            admin.close();
 
             return 0;
         } catch (Exception e) {
             printUsage(options);
             throw e;
+        } finally {
+            if (admin != null)
+                admin.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
index ff8b659..11c1711 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput.java
@@ -54,7 +54,7 @@ public class HBaseMROutput implements IMROutput {
 
             @Override
             public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-                steps.addCubingGarbageCollectionSteps(jobFlow);
+                steps.addInvertedIndexGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 1267d2d..03db6b0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -184,7 +184,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
     public void addSaveIIToHTableSteps(DefaultChainedExecutable jobFlow, String rootPath) {
-        // create htable step
+        // create htable if it doesn't exist
         jobFlow.addTask(createCreateIIHTableStep(seg));
 
         final String iiPath = rootPath + "*";
@@ -198,6 +198,22 @@ public class HBaseMRSteps extends JobBuilderSupport {
     }
 
 
+    public void addInvertedIndexGarbageCollectionSteps(DefaultChainedExecutable jobFlow) {
+        String jobId = jobFlow.getId();
+
+        List<String> toDeletePaths = new ArrayList<>();
+        toDeletePaths.add(getJobWorkingDir(jobId));
+
+        HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setDeletePaths(toDeletePaths);
+        step.setJobId(jobId);
+
+        jobFlow.addTask(step);
+    }
+
+    
+
     private HadoopShellExecutable createCreateIIHTableStep(IRealizationSegment seg) {
         HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
         createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
index b200c2e..15dc993 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/DeployCoprocessorCLI.java
@@ -46,7 +46,6 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
@@ -108,7 +107,7 @@ public class DeployCoprocessorCLI {
 
     private static void initHTableCoprocessor(HTableDescriptor desc) throws IOException {
         KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
         FileSystem fileSystem = FileSystem.get(hconf);
 
         String localCoprocessorJar = kylinConfig.getCoprocessorLocalJar();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
index 88cb7de..2137f57 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/StorageCleanupJob.java
@@ -35,8 +35,6 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MasterNotRunningException;
-import org.apache.hadoop.hbase.ZooKeeperConnectionException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
index 335e00c..6c6ed80 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/TsConditionEraserTest.java
@@ -6,6 +6,7 @@ import java.util.Arrays;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -40,7 +41,10 @@ public class TsConditionEraserTest extends LocalFileMetadataTestCase {
     @Before
     public void setup() throws IOException {
         this.createTestMetadata();
-        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        IIManager iiManager = IIManager.getInstance(getTestConfig());
+        this.ii = iiManager.getII("test_kylin_ii_left_join");
+        IISegment segment = iiManager.buildSegment(ii, 0, System.currentTimeMillis());
+        ii.getSegments().add(segment);
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
         this.caldt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
index e271129..8b56605 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,20 +18,11 @@
 
 package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -45,7 +36,11 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.*;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  *
@@ -90,6 +85,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
     @Test
     public void testSerializeAggregator() {
         final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
         byte[] x = EndpointAggregators.serialize(endpointAggregators);
@@ -139,6 +138,10 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
     @Test
     public void basicTest() {
         final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
         final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
index 791002f..3e34495 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.junit.After;
@@ -41,6 +42,10 @@ public class TableRecordInfoTest extends LocalFileMetadataTestCase {
     public void setup() throws IOException {
         this.createTestMetadata();
         this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d5d8a403/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
index 412e335..4e5a9d9 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -58,6 +59,10 @@ public class TsConditionExtractorTest extends LocalFileMetadataTestCase {
     public void setup() throws IOException {
         this.createTestMetadata();
         this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        if (ii.getFirstSegment() == null) {
+            IISegment segment = IIManager.getInstance(getTestConfig()).buildSegment(ii, 0, System.currentTimeMillis());
+            ii.getSegments().add(segment);
+        }
         this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
         this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
         this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");